[1/2] incubator-beam git commit: Add support for date partitioned table names
Repository: incubator-beam Updated Branches: refs/heads/python-sdk bb09c07b6 -> 409d067b3 Add support for date partitioned table names These names have the format "tablename$mmdd". Previously the dollar sign caused this to be deemed invalid. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1af871a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1af871a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1af871a Branch: refs/heads/python-sdk Commit: a1af871a0c8c92a6d84f2e9950615f7737118d7e Parents: bb09c07 Author: Kevin GraneyAuthored: Tue Dec 6 15:09:42 2016 -0500 Committer: Robert Bradshaw Committed: Wed Dec 21 15:16:45 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 6 -- sdks/python/apache_beam/io/bigquery_test.py | 8 2 files changed, 12 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index ce75e10..2059de4 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -275,7 +275,9 @@ def _parse_table_reference(table, dataset=None, project=None): then the table argument must contain the entire table reference: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a bigquery.TableReference instance in which case dataset and project are - ignored and the reference is returned as a result. + ignored and the reference is returned as a result. Additionally, for date + partitioned tables, appending '$mmdd' to the table name is supported, + e.g. 'DATASET.TABLE$mmdd'. dataset: The ID of the dataset containing this table or null if the table reference is specified entirely by the table argument. project: The ID of the project containing this table or null if the table @@ -300,7 +302,7 @@ def _parse_table_reference(table, dataset=None, project=None): # table name. if dataset is None: match = re.match( -r'^((?P.+):)?(?P\w+)\.(?P\w+)$', table) +r'^((?P.+):)?(?P\w+)\.(?P[\w\$]+)$', table) if not match: raise ValueError( 'Expected a table reference (PROJECT:DATASET.TABLE or ' http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index a2cf947..f6f9363 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -208,6 +208,14 @@ class TestBigQuerySource(unittest.TestCase): self.assertEqual(source.query, 'my_query') self.assertIsNone(source.table_reference) + def test_date_partitioned_table_name(self): +source = beam.io.BigQuerySource('dataset.table$20030102', validate=True) +dd = DisplayData.create_from(source) +expected_items = [ +DisplayDataItemMatcher('validation', True), +DisplayDataItemMatcher('table', 'dataset.table$20030102')] +hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) + class TestBigQuerySink(unittest.TestCase):
[2/2] incubator-beam git commit: Closes #1534
Closes #1534 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/409d067b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/409d067b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/409d067b Branch: refs/heads/python-sdk Commit: 409d067b36036981e330a055b652bb74a93f4ca2 Parents: bb09c07 a1af871 Author: Robert BradshawAuthored: Wed Dec 21 15:16:46 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:16:46 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 6 -- sdks/python/apache_beam/io/bigquery_test.py | 8 2 files changed, 12 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fixing inconsistencies in PipelineOptions
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3454d691f -> bb09c07b6 Fixing inconsistencies in PipelineOptions The following options have changed: * job_name - Default is 'beamapp-username-date-microseconds'. Test was added. * staging_location and temp_location - staging_location was the default of temp_location. Now it's the other way around, and the tests reflect that. * machine_type alias of worker_machine_type has been removed. * disk_type alias of worker_disk_type has been removed. * disk_source_image option has been removed. * no_save_main_session option has been removed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7 Branch: refs/heads/python-sdk Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1 Parents: 3454d69 Author: PabloAuthored: Tue Dec 6 18:01:54 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:14:52 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 45 .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py| 33 ++ .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py| 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index f1341a7..3a9ba46 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -18,6 +18,8 @@ """Dataflow client utility functions.""" import codecs +from datetime import datetime +import getpass import json import logging import os @@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow -BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' -COMPUTE_API_SERVICE = 'compute.googleapis.com' -STORAGE_API_SERVICE = 'storage.googleapis.com' - class Step(object): """Wrapper for a dataflow Step protobuf.""" @@ -121,11 +119,13 @@ class Environment(object): self.worker_options = options.view_as(WorkerOptions) self.debug_options = options.view_as(DebugOptions) self.proto = dataflow.Environment() -self.proto.clusterManagerApiService = COMPUTE_API_SERVICE -self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE +self.proto.clusterManagerApiService = GoogleCloudOptions.COMPUTE_API_SERVICE +self.proto.dataset = '{}/cloud_dataflow'.format( +GoogleCloudOptions.BIGQUERY_API_SERVICE) self.proto.tempStoragePrefix = ( -self.google_cloud_options.temp_location.replace('gs:/', -STORAGE_API_SERVICE)) +self.google_cloud_options.temp_location.replace( +'gs:/', +GoogleCloudOptions.STORAGE_API_SERVICE)) # User agent information. self.proto.userAgent = dataflow.Environment.UserAgentValue() self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint @@ -165,7 +165,7 @@ class Environment(object): dataflow.Package( location='%s/%s' % ( self.google_cloud_options.staging_location.replace( - 'gs:/', STORAGE_API_SERVICE), + 'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE), package), name=package)) @@ -174,7 +174,7 @@ class Environment(object): packages=package_descriptors, taskrunnerSettings=dataflow.TaskRunnerSettings( parallelWorkerSettings=dataflow.WorkerSettings( -baseUrl='https://dataflow.googleapis.com', +baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT, servicePath=self.google_cloud_options.dataflow_endpoint))) pool.autoscalingSettings = dataflow.AutoscalingSettings() # Set worker pool options received through command line. @@ -195,8 +195,6 @@ class Environment(object): pool.diskSizeGb = self.worker_options.disk_size_gb if self.worker_options.disk_type: pool.diskType = self.worker_options.disk_type -if self.worker_options.disk_source_image: - pool.diskSourceImage = self.worker_options.disk_source_image if self.worker_options.zone: pool.zone = self.worker_options.zone if
[2/2] incubator-beam git commit: Closes #1526
Closes #1526 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb09c07b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb09c07b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb09c07b Branch: refs/heads/python-sdk Commit: bb09c07b6351dcc53c0bdc8bf1259261ad2edfba Parents: 3454d69 35e2fdc Author: Robert BradshawAuthored: Wed Dec 21 15:15:20 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 21 15:15:20 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 45 .../apache_beam/internal/apiclient_test.py | 6 +++ sdks/python/apache_beam/utils/options.py| 33 ++ .../utils/pipeline_options_validator.py | 11 ++--- .../utils/pipeline_options_validator_test.py| 8 ++-- 5 files changed, 54 insertions(+), 49 deletions(-) --
[1/2] incubator-beam git commit: Update Apitools to version 0.5.6
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e26527873 -> 3b4fd5c7d Update Apitools to version 0.5.6 This brings in the fix to https://github.com/google/apitools/pull/136 needed for the BigQuery reader. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63074312 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63074312 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63074312 Branch: refs/heads/python-sdk Commit: 63074312aeb44b5db7f4e914c64864483f6a6510 Parents: e265278 Author: Sourabh BajajAuthored: Sat Dec 3 09:02:32 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 17:11:58 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63074312/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 033afc7..f6357b6 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -86,7 +86,7 @@ else: REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'dill>=0.2.5,<0.3', -'google-apitools>=0.5.2,<1.0.0', +'google-apitools>=0.5.6,<1.0.0', 'googledatastore==6.4.1', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0',
[2/2] incubator-beam git commit: Closes #1501
Closes #1501 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b4fd5c7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b4fd5c7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b4fd5c7 Branch: refs/heads/python-sdk Commit: 3b4fd5c7d962987405dc157e6b84788af61f6413 Parents: e265278 6307431 Author: Robert BradshawAuthored: Thu Dec 15 17:12:55 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 17:12:55 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Rename PTransform.apply() to PTransform.expand()
est_combine_globally_with_default_side_input(self): class CombineWithSideInput(PTransform): - def apply(self, pcoll): + def expand(self, pcoll): side = pcoll | CombineGlobally(sum).as_singleton_view() main = pcoll.pipeline | Create([None]) return main | Map(lambda _, s: s, side) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 523c5a6..0ba1c62 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -598,7 +598,7 @@ class ParDo(PTransformWithSideInputs): label='Transform Function'), 'fn_dd': self.fn} - def apply(self, pcoll): + def expand(self, pcoll): self.side_output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. self.dofn = self.fn @@ -641,7 +641,7 @@ class _MultiParDo(PTransform): self._tags = tags self._main_tag = main_tag - def apply(self, pcoll): + def expand(self, pcoll): _ = pcoll | self._do_transform return pvalue.DoOutputsTuple( pcoll.pipeline, self._do_transform, self._tags, self._main_tag) @@ -854,7 +854,7 @@ class CombineGlobally(PTransform): def as_singleton_view(self): return self.clone(as_view=True) - def apply(self, pcoll): + def expand(self, pcoll): def add_input_types(transform): type_hints = self.get_type_hints() if type_hints.input_types: @@ -939,7 +939,7 @@ class CombinePerKey(PTransformWithSideInputs): def process_argspec_fn(self): return self.fn._fn # pylint: disable=protected-access - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) return pcoll | GroupByKey() | CombineValues('Combine', @@ -952,7 +952,7 @@ class CombineValues(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn) - def apply(self, pcoll): + def expand(self, pcoll): args, kwargs = util.insert_values_in_args( self.args, self.kwargs, self.side_inputs) @@ -1083,7 +1083,7 @@ class GroupByKey(PTransform): timer_window, name, time_domain, fire_time, state): yield wvalue.with_value((k, wvalue.value)) - def apply(self, pcoll): + def expand(self, pcoll): # This code path is only used in the local direct runner. For Dataflow # runner execution, the GroupByKey transform is expanded on the service. input_type = pcoll.element_type @@ -1132,7 +1132,7 @@ class GroupByKeyOnly(PTransform): key_type, value_type = trivial_inference.key_value_types(input_type) return KV[key_type, Iterable[value_type]] - def apply(self, pcoll): + def expand(self, pcoll): self._check_pcollection(pcoll) return pvalue.PCollection(pcoll.pipeline) @@ -1170,7 +1170,7 @@ class Partition(PTransformWithSideInputs): def make_fn(self, fn): return fn if isinstance(fn, PartitionFn) else CallableWrapperPartitionFn(fn) - def apply(self, pcoll): + def expand(self, pcoll): n = int(self.args[0]) return pcoll | ParDo( self.ApplyPartitionFnFn(), self.fn, *self.args, @@ -1261,14 +1261,14 @@ class WindowInto(ParDo): def infer_output_type(self, input_type): return input_type - def apply(self, pcoll): + def expand(self, pcoll): input_type = pcoll.element_type if input_type is not None: output_type = input_type self.with_input_types(input_type) self.with_output_types(output_type) -return super(WindowInto, self).apply(pcoll) +return super(WindowInto, self).expand(pcoll) # Python's pickling is broken for nested classes. @@ -1305,7 +1305,7 @@ class Flatten(PTransform): raise ValueError('Input to Flatten must be an iterable.') return pvalueish, pvalueish - def apply(self, pcolls): + def expand(self, pcolls): for pcoll in pcolls: self._check_pcollection(pcoll) return pvalue.PCollection(self.pipeline) @@ -1345,7 +1345,7 @@ class Create(PTransform): else: return Union[[trivial_inference.instance_to_type(v) for v in self.value]] - def apply(self, pbegin): + def expand(self, pbegin): assert isinstance(pbegin, pvalue.PBegin) self.pipeline = pbegin.pipeline return pvalue.PCollection(self.pipeline) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform.py -- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 2212d00..1bd7fb4 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apa
[2/2] incubator-beam git commit: Closes #1634
Closes #1634 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2652787 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2652787 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2652787 Branch: refs/heads/python-sdk Commit: e2652787355d4c322138f55ae2c54494ec592e59 Parents: d3c8874 e62249a Author: Robert BradshawAuthored: Thu Dec 15 16:52:39 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 16:52:39 2016 -0800 -- sdks/python/README.md | 2 +- .../examples/complete/autocomplete.py | 2 +- .../examples/complete/estimate_pi.py| 2 +- .../apache_beam/examples/complete/tfidf.py | 2 +- .../examples/complete/top_wikipedia_sessions.py | 6 ++--- .../examples/cookbook/custom_ptransform.py | 2 +- .../examples/cookbook/multiple_output_pardo.py | 2 +- .../apache_beam/examples/snippets/snippets.py | 16 ++--- .../examples/snippets/snippets_test.py | 2 +- .../apache_beam/examples/wordcount_debugging.py | 2 +- sdks/python/apache_beam/io/avroio.py| 4 ++-- .../apache_beam/io/datastore/v1/datastoreio.py | 4 ++-- sdks/python/apache_beam/io/iobase.py| 6 ++--- sdks/python/apache_beam/io/textio.py| 4 ++-- sdks/python/apache_beam/pipeline_test.py| 4 ++-- .../runners/dataflow/native_io/iobase.py| 2 +- .../apache_beam/runners/direct/direct_runner.py | 2 +- sdks/python/apache_beam/runners/runner.py | 4 ++-- sdks/python/apache_beam/transforms/combiners.py | 14 ++-- .../apache_beam/transforms/combiners_test.py| 2 +- sdks/python/apache_beam/transforms/core.py | 24 ++-- .../python/apache_beam/transforms/ptransform.py | 10 .../apache_beam/transforms/ptransform_test.py | 6 ++--- .../python/apache_beam/transforms/sideinputs.py | 10 sdks/python/apache_beam/transforms/util.py | 4 ++-- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py| 2 +- 27 files changed, 71 insertions(+), 71 deletions(-) --
[2/2] incubator-beam git commit: Closes #1617
Closes #1617 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3c88748 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3c88748 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3c88748 Branch: refs/heads/python-sdk Commit: d3c88748099fcccb27aef67c5c390d0bc67ebeb0 Parents: e383c77 0a558c7 Author: Robert BradshawAuthored: Thu Dec 15 16:35:59 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 15 16:35:59 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Update the BQ export flat from Json to Avro
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e383c7715 -> d3c887480 Update the BQ export flat from Json to Avro Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a558c71 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a558c71 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a558c71 Branch: refs/heads/python-sdk Commit: 0a558c7171d6e4452d88ecffd16a024a19cbfc42 Parents: e383c77 Author: Sourabh BajajAuthored: Wed Dec 14 11:44:46 2016 -0800 Committer: Sourabh Bajaj Committed: Wed Dec 14 11:44:46 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a558c71/sdks/python/apache_beam/runners/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 8b953b0..a3f7d94 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -520,7 +520,7 @@ class DataflowPipelineRunner(PipelineRunner): elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) elif transform.source.format == 'bigquery': - step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON') + step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO') # TODO(silviuc): Add table validation if transform.source.validate. if transform.source.table_reference is not None: step.add_property(PropertyNames.BIGQUERY_DATASET,
[2/2] incubator-beam git commit: Closes #1591
Closes #1591 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e383c771 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e383c771 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e383c771 Branch: refs/heads/python-sdk Commit: e383c77151bcfb61e3394c5dd040a425aa246bec Parents: f086afe e36ee8d Author: Robert BradshawAuthored: Tue Dec 13 11:09:13 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 13 11:09:13 2016 -0800 -- sdks/python/apache_beam/runners/direct/transform_evaluator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Do not test pickling native sink objects
Repository: incubator-beam Updated Branches: refs/heads/python-sdk f086afe12 -> e383c7715 Do not test pickling native sink objects Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e36ee8d7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e36ee8d7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e36ee8d7 Branch: refs/heads/python-sdk Commit: e36ee8d797965e2e2101cce046a14399f2008fc6 Parents: f086afe Author: Ahmet AltayAuthored: Mon Dec 12 19:09:37 2016 -0800 Committer: Ahmet Altay Committed: Mon Dec 12 19:09:37 2016 -0800 -- sdks/python/apache_beam/runners/direct/transform_evaluator.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e36ee8d7/sdks/python/apache_beam/runners/direct/transform_evaluator.py -- diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py index 7a9a31f..24ab754 100644 --- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py +++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py @@ -505,8 +505,7 @@ class _NativeWriteEvaluator(_TransformEvaluator): side_inputs) assert applied_ptransform.transform.sink -# TODO(aaltay): Consider storing the serialized form as an optimization. -self._sink = pickler.loads(pickler.dumps(applied_ptransform.transform.sink)) +self._sink = applied_ptransform.transform.sink @property def _is_final_bundle(self):
[1/2] incubator-beam git commit: [BEAM-1124] Temporarily Ignore a ValidatesRunnerTest That Broke Postcommit
Repository: incubator-beam Updated Branches: refs/heads/python-sdk b265dceaa -> f086afe12 [BEAM-1124] Temporarily Ignore a ValidatesRunnerTest That Broke Postcommit Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a70e581 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a70e581 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a70e581 Branch: refs/heads/python-sdk Commit: 0a70e581bef4258dfb18f3c5db8f9a9369ab13e8 Parents: b265dce Author: Mark LiuAuthored: Fri Dec 9 16:55:45 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 12 14:19:57 2016 -0800 -- sdks/python/apache_beam/dataflow_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a70e581/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index ba3553a..f410230 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -176,7 +176,9 @@ class DataflowTest(unittest.TestCase): assert_that(result, equal_to([(1, 'empty'), (2, 'empty')])) pipeline.run() - @attr('ValidatesRunner') + # @attr('ValidatesRunner') + # TODO(BEAM-1124): Temporarily disable it due to test failed running on + # Dataflow service. def test_multi_valued_singleton_side_input(self): pipeline = TestPipeline() pcol = pipeline | 'start' >> Create([1, 2])
[2/2] incubator-beam git commit: Closes #1571
Closes #1571 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f086afe1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f086afe1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f086afe1 Branch: refs/heads/python-sdk Commit: f086afe125cc413e7626ab186268d5bb3c067a89 Parents: b265dce 0a70e58 Author: Robert BradshawAuthored: Mon Dec 12 14:19:58 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 12 14:19:58 2016 -0800 -- sdks/python/apache_beam/dataflow_test.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Add more documentation to datastore_wordcount example
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 8eae855d6 -> b265dceaa Add more documentation to datastore_wordcount example Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/62b8095e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/62b8095e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/62b8095e Branch: refs/heads/python-sdk Commit: 62b8095e7164a316b8ae93c7fefa41d38ee255a8 Parents: 8eae855 Author: Vikas KedigehalliAuthored: Wed Dec 7 14:14:41 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:29:02 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 46 +++- 1 file changed, 44 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/62b8095e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py index eb62614..9613402 100644 --- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -15,7 +15,49 @@ # limitations under the License. # -"""A word-counting workflow that uses Google Cloud Datastore.""" +"""A word-counting workflow that uses Google Cloud Datastore. + +This example shows how to use ``datastoreio`` to read from and write to +Google Cloud Datastore. Note that running this example may incur charge for +Cloud Datastore operations. + +See https://developers.google.com/datastore/ for more details on Google Cloud +Datastore. +See http://beam.incubator.apache.org/get-started/quickstart on +how to run a Beam pipeline. + +Read-only Mode: In this mode, this example reads Cloud Datastore entities using +the ``datastoreio.ReadFromDatastore`` transform, extracts the words, +counts them and write the output to a set of files. + +The following options must be provided to run this pipeline in read-only mode: +`` +--project YOUR_PROJECT_ID +--kind YOUR_DATASTORE_KIND +--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH] +--read-only +`` + +Read-write Mode: In this mode, this example reads words from an input file, +converts them to Cloud Datastore ``Entity`` objects and writes them to +Cloud Datastore using the ``datastoreio.Write`` transform. The second pipeline +will then read these Cloud Datastore entities using the +``datastoreio.ReadFromDatastore`` transform, extract the words, count them and +write the output to a set of files. + +The following options must be provided to run this pipeline in read-write mode: +`` +--project YOUR_PROJECT_ID +--kind YOUR_DATASTORE_KIND +--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH] +`` + +Note: We are using the Cloud Datastore protobuf objects directly because +that is the interface that the ``datastoreio`` exposes. +See the following links on more information about these protobuf messages. +https://cloud.google.com/datastore/docs/reference/rpc/google.datastore.v1 and +https://github.com/googleapis/googleapis/tree/master/google/datastore/v1 +""" from __future__ import absolute_import @@ -196,7 +238,7 @@ def run(argv=None): if not known_args.read_only: write_to_datastore(gcloud_options.project, known_args, pipeline_options) - # Read from Datastore. + # Read entities from Datastore. result = read_from_datastore(gcloud_options.project, known_args, pipeline_options)
[2/2] incubator-beam git commit: Closes #1540
Closes #1540 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b265dcea Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b265dcea Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b265dcea Branch: refs/heads/python-sdk Commit: b265dceaac93a50543151efb4c3168a8275e8b2d Parents: 8eae855 62b8095 Author: Robert BradshawAuthored: Fri Dec 9 11:29:03 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:29:03 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 46 +++- 1 file changed, 44 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Closes #1551
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 44c1586f3 -> 8eae855d6 Closes #1551 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8eae855d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8eae855d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8eae855d Branch: refs/heads/python-sdk Commit: 8eae855d608abb140c0e3ece3927765575b81cbb Parents: 44c1586 2dee686 Author: Robert BradshawAuthored: Fri Dec 9 11:28:02 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:28:02 2016 -0800 -- sdks/python/run_postcommit.sh | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) --
[2/2] incubator-beam git commit: [BEAM-1109] Fix Python Postcommit Test Timeout
[BEAM-1109] Fix Python Postcommit Test Timeout Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2dee6868 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2dee6868 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2dee6868 Branch: refs/heads/python-sdk Commit: 2dee68683ad6d18161d22cd71f8d90768d7fcb30 Parents: 44c1586 Author: Mark LiuAuthored: Thu Dec 8 00:06:06 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 9 11:28:02 2016 -0800 -- sdks/python/run_postcommit.sh | 5 + 1 file changed, 1 insertion(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2dee6868/sdks/python/run_postcommit.sh -- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 4da078f..2e00a03 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -71,11 +71,8 @@ python setup.py sdist SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz) # Run ValidatesRunner tests on Google Cloud Dataflow service -# processes -> number of processes to run tests in parallel -# process-timeout -> test timeout in seconds python setup.py nosetests \ - -a ValidatesRunner --processes=4 --process-timeout=360 \ - --test-pipeline-options=" \ + -a ValidatesRunner --test-pipeline-options=" \ --runner=BlockingDataflowPipelineRunner \ --project=$PROJECT \ --staging_location=$GCS_LOCATION/staging-validatesrunner-test \
[2/2] incubator-beam git commit: Fix a typo in query split error handling
Fix a typo in query split error handling Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6afb906 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6afb906 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6afb906 Branch: refs/heads/python-sdk Commit: d6afb90690f5be2f3cb38d68dc1d49a1b551118e Parents: 1392f70 Author: Vikas KedigehalliAuthored: Wed Dec 7 14:19:26 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:22:31 2016 -0800 -- .../apache_beam/io/datastore/v1/datastoreio.py | 2 +- .../io/datastore/v1/datastoreio_test.py | 29 2 files changed, 30 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index fc3e813..a86bb0b 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -181,7 +181,7 @@ class ReadFromDatastore(PTransform): except Exception: logging.warning("Unable to parallelize the given query: %s", query, exc_info=True) -query_splits = [(key, query)] +query_splits = [query] sharded_query_splits = [] for split_query in query_splits: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py index 2ac7ffb..f80a320 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py @@ -122,6 +122,35 @@ class DatastoreioTest(unittest.TestCase): self.assertEqual(1, len(returned_split_queries)) self.assertEqual(0, len(self._mock_datastore.method_calls)) + def test_SplitQueryFn_with_exception(self): +"""A test that verifies that no split is performed when failures occur.""" +with patch.object(helper, 'get_datastore', + return_value=self._mock_datastore): + # Force SplitQueryFn to compute the number of query splits + num_splits = 0 + expected_num_splits = 1 + entity_bytes = (expected_num_splits * + ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES) + with patch.object(ReadFromDatastore, 'get_estimated_size_bytes', +return_value=entity_bytes): + +with patch.object(query_splitter, 'get_splits', + side_effect=ValueError("Testing query split error")): + split_query_fn = ReadFromDatastore.SplitQueryFn( + self._PROJECT, self._query, None, num_splits) + mock_context = MagicMock() + mock_context.element = self._query + split_query_fn.start_bundle(mock_context) + returned_split_queries = [] + for split_query in split_query_fn.process(mock_context): +returned_split_queries.append(split_query) + + self.assertEqual(len(returned_split_queries), expected_num_splits) + self.assertEqual(returned_split_queries[0][1], self._query) + self.assertEqual(0, + len(self._mock_datastore.run_query.call_args_list)) + self.verify_unique_keys(returned_split_queries) + def test_DatastoreWriteFn_with_emtpy_batch(self): self.check_DatastoreWriteFn(0)
[1/2] incubator-beam git commit: Closes #1542
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 1392f70b6 -> 44c1586f3 Closes #1542 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44c1586f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44c1586f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44c1586f Branch: refs/heads/python-sdk Commit: 44c1586f3815f957a43037309f2a46a1766c6516 Parents: 1392f70 d6afb90 Author: Robert BradshawAuthored: Thu Dec 8 11:22:31 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:22:31 2016 -0800 -- .../apache_beam/io/datastore/v1/datastoreio.py | 2 +- .../io/datastore/v1/datastoreio_test.py | 29 2 files changed, 30 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Handle empty batches in GcsIO batch methods
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 75be6e974 -> 1392f70b6 Handle empty batches in GcsIO batch methods Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ef83b33 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ef83b33 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ef83b33 Branch: refs/heads/python-sdk Commit: 3ef83b3396a4574b3283b29ba1f878b31badd612 Parents: 75be6e9 Author: Charles ChenAuthored: Wed Dec 7 15:03:01 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:18:51 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 4 sdks/python/apache_beam/io/gcsio_test.py | 4 2 files changed, 8 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ef83b33/sdks/python/apache_beam/io/gcsio.py -- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 748465f..f150c4c 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -204,6 +204,8 @@ class GcsIO(object): argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ +if not paths: + return [] batch_request = BatchApiRequest( retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES) for path in paths: @@ -264,6 +266,8 @@ class GcsIO(object): src_dest_pairs argument, where exception is None if the operation succeeded or the relevant exception if the operation failed. """ +if not src_dest_pairs: + return [] batch_request = BatchApiRequest( retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES) for src, dest in src_dest_pairs: http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ef83b33/sdks/python/apache_beam/io/gcsio_test.py -- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 5af13c6..bd7eb51 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -265,6 +265,10 @@ class TestGCSIO(unittest.TestCase): with self.assertRaises(ValueError): self.gcs.open(file_name, 'r+b') + def test_empty_batches(self): +self.assertEqual([], self.gcs.copy_batch([])) +self.assertEqual([], self.gcs.delete_batch([])) + def test_delete(self): file_name = 'gs://gcsio-test/delete_me' file_size = 1024
[2/2] incubator-beam git commit: Closes #1544
Closes #1544 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1392f70b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1392f70b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1392f70b Branch: refs/heads/python-sdk Commit: 1392f70b6ee2db76e333fad16efe0cfdd00e7175 Parents: 75be6e9 3ef83b3 Author: Robert BradshawAuthored: Thu Dec 8 11:18:52 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:18:52 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 4 sdks/python/apache_beam/io/gcsio_test.py | 4 2 files changed, 8 insertions(+) --
[1/2] incubator-beam git commit: Add reference to the >> and | operators for pipelines.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 43057960a -> 75be6e974 Add reference to the >> and | operators for pipelines. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3510ff99 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3510ff99 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3510ff99 Branch: refs/heads/python-sdk Commit: 3510ff99da9cd149e67e8fdb12b942689374b2d7 Parents: 4305796 Author: Maria Garcia HerreroAuthored: Tue Dec 6 21:05:47 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:14:51 2016 -0800 -- sdks/python/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3510ff99/sdks/python/README.md -- diff --git a/sdks/python/README.md b/sdks/python/README.md index cff497c..820084d 100644 --- a/sdks/python/README.md +++ b/sdks/python/README.md @@ -163,8 +163,9 @@ The following examples demonstrate some basic, fundamental concepts for using Ap A basic pipeline will take as input an iterable, apply the beam.Create `PTransform`, and produce a `PCollection` that can -be written to a file or modified by further `PTransform`s. The -pipe operator allows to chain `PTransform`s. +be written to a file or modified by further `PTransform`s. +The `>>` operator is used to label `PTransform`s and +the `|` operator is used to chain them. ```python # Standard imports
[2/2] incubator-beam git commit: Closes #1521
Closes #1521 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75be6e97 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75be6e97 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75be6e97 Branch: refs/heads/python-sdk Commit: 75be6e974831f73ea935fe1f52fd7091a03c8928 Parents: 4305796 3510ff9 Author: Robert BradshawAuthored: Thu Dec 8 11:14:52 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 8 11:14:52 2016 -0800 -- sdks/python/README.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fix template_runner_test on Windows
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 4a660c604 -> 43057960a Fix template_runner_test on Windows Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a565ca10 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a565ca10 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a565ca10 Branch: refs/heads/python-sdk Commit: a565ca1008309564dba41d551c68ea553cd83a7b Parents: 4a660c6 Author: Charles ChenAuthored: Wed Dec 7 16:09:49 2016 -0800 Committer: Charles Chen Committed: Wed Dec 7 16:09:49 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 14 ++ 1 file changed, 10 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a565ca10/sdks/python/apache_beam/runners/template_runner_test.py -- diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py index a141521..cc3d7c2 100644 --- a/sdks/python/apache_beam/runners/template_runner_test.py +++ b/sdks/python/apache_beam/runners/template_runner_test.py @@ -33,24 +33,30 @@ from apache_beam.internal import apiclient class TemplatingDataflowPipelineRunnerTest(unittest.TestCase): """TemplatingDataflow tests.""" def test_full_completion(self): -dummy_file = tempfile.NamedTemporaryFile() +# Create dummy file and close it. Note that we need to do this because +# Windows does not allow NamedTemporaryFiles to be reopened elsewhere +# before the temporary file is closed. +dummy_file = tempfile.NamedTemporaryFile(delete=False) +dummy_file_name = dummy_file.name +dummy_file.close() + dummy_dir = tempfile.mkdtemp() remote_runner = DataflowPipelineRunner() pipeline = Pipeline(remote_runner, options=PipelineOptions([ '--dataflow_endpoint=ignored', -'--sdk_location=' + dummy_file.name, +'--sdk_location=' + dummy_file_name, '--job_name=test-job', '--project=test-project', '--staging_location=' + dummy_dir, '--temp_location=/dev/null', -'--template_location=' + dummy_file.name, +'--template_location=' + dummy_file_name, '--no_auth=True'])) pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned pipeline.run() -with open(dummy_file.name) as template_file: +with open(dummy_file_name) as template_file: saved_job_dict = json.load(template_file) self.assertEqual( saved_job_dict['environment']['sdkPipelineOptions']
[2/2] incubator-beam git commit: Closes #1548
Closes #1548 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/43057960 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/43057960 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/43057960 Branch: refs/heads/python-sdk Commit: 43057960af906ff5c435ea016d5f6df5bccaea40 Parents: 4a660c6 a565ca1 Author: Robert BradshawAuthored: Wed Dec 7 18:17:39 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 7 18:17:39 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 14 ++ 1 file changed, 10 insertions(+), 4 deletions(-) --
[1/2] incubator-beam git commit: Closes #1492
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d5e8c79a3 -> 4a660c604 Closes #1492 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a660c60 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a660c60 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a660c60 Branch: refs/heads/python-sdk Commit: 4a660c604a0b21f685d87b8ecc008aeb13bb4049 Parents: d5e8c79 c7626ad Author: Robert BradshawAuthored: Wed Dec 7 12:14:38 2016 -0800 Committer: Robert Bradshaw Committed: Wed Dec 7 12:14:38 2016 -0800 -- sdks/python/run_postcommit.sh | 36 ++-- 1 file changed, 26 insertions(+), 10 deletions(-) --
[2/2] incubator-beam git commit: Closes #1485
Closes #1485 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f19f767b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f19f767b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f19f767b Branch: refs/heads/python-sdk Commit: f19f767b09275bdea325bb37a3767d96eeacd4a0 Parents: 6dcc429 aef4858 Author: Robert BradshawAuthored: Tue Dec 6 10:14:11 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:14:11 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 19 +-- sdks/python/apache_beam/internal/pickler.py | 8 2 files changed, 25 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Fix the pickle issue with the inconsistency of dill load and dump session
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6dcc429e5 -> f19f767b0 Fix the pickle issue with the inconsistency of dill load and dump session Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aef4858b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aef4858b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aef4858b Branch: refs/heads/python-sdk Commit: aef4858b80dfacf3401e6672b9373c82a8e77027 Parents: 6dcc429 Author: Sourabh BajajAuthored: Fri Dec 2 15:02:18 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:14:10 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 19 +-- sdks/python/apache_beam/internal/pickler.py | 8 2 files changed, 25 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index c5f5f70..f1341a7 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -46,7 +46,6 @@ from apache_beam.utils.options import WorkerOptions from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow - BIGQUERY_API_SERVICE = 'bigquery.googleapis.com' COMPUTE_API_SERVICE = 'compute.googleapis.com' STORAGE_API_SERVICE = 'storage.googleapis.com' @@ -55,13 +54,19 @@ STORAGE_API_SERVICE = 'storage.googleapis.com' class Step(object): """Wrapper for a dataflow Step protobuf.""" - def __init__(self, step_kind, step_name): + def __init__(self, step_kind, step_name, additional_properties=None): self.step_kind = step_kind self.step_name = step_name self.proto = dataflow.Step(kind=step_kind, name=step_name) self.proto.properties = {} +self._additional_properties = [] + +if additional_properties is not None: + for (n, v, t) in additional_properties: +self.add_property(n, v, t) def add_property(self, name, value, with_type=False): +self._additional_properties.append((name, value, with_type)) self.proto.properties.additionalProperties.append( dataflow.Step.PropertiesValue.AdditionalProperty( key=name, value=to_json_value(value, with_type=with_type))) @@ -77,6 +82,11 @@ class Step(object): outputs.append(entry_prop.value.string_value) return outputs + def __reduce__(self): +"""Reduce hook for pickling the Step class more easily +""" +return (Step, (self.step_kind, self.step_name, self._additional_properties)) + def get_output(self, tag=None): """Returns name if it is one of the outputs or first output if name is None. @@ -330,6 +340,11 @@ class Job(object): def json(self): return encoding.MessageToJson(self.proto) + def __reduce__(self): +"""Reduce hook for pickling the Job class more easily +""" +return (Job, (self.options,)) + class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/pickler.py -- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index 30f0b77..d39a497 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -204,6 +204,14 @@ def loads(encoded): def dump_session(file_path): + """Pickle the current python session to be used in the worker. + + Note: Due to the inconsistency in the first dump of dill dump_session we + create and load the dump twice to have consistent results in the worker and + the running session. Check: https://github.com/uqfoundation/dill/issues/195 + """ + dill.dump_session(file_path) + dill.load_session(file_path) return dill.dump_session(file_path)
[2/2] incubator-beam git commit: Closes #1512
Closes #1512 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6dcc429e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6dcc429e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6dcc429e Branch: refs/heads/python-sdk Commit: 6dcc429e5bf4bb37605773be6de31efa3f887093 Parents: e73bdb5 4a02a68 Author: Robert BradshawAuthored: Tue Dec 6 10:10:45 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:10:45 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 82 +++ sdks/python/apache_beam/template_runner_test.py | 83 2 files changed, 82 insertions(+), 83 deletions(-) --
[1/2] incubator-beam git commit: Move template_runners_test to runners folder.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e73bdb5c2 -> 6dcc429e5 Move template_runners_test to runners folder. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a02a688 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a02a688 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a02a688 Branch: refs/heads/python-sdk Commit: 4a02a688dbacd640e40d7b3d3ca268fde9806e73 Parents: e73bdb5 Author: Ahmet AltayAuthored: Mon Dec 5 14:57:14 2016 -0800 Committer: Robert Bradshaw Committed: Tue Dec 6 10:10:44 2016 -0800 -- .../apache_beam/runners/template_runner_test.py | 82 +++ sdks/python/apache_beam/template_runner_test.py | 83 2 files changed, 82 insertions(+), 83 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a02a688/sdks/python/apache_beam/runners/template_runner_test.py -- diff --git a/sdks/python/apache_beam/runners/template_runner_test.py b/sdks/python/apache_beam/runners/template_runner_test.py new file mode 100644 index 000..a141521 --- /dev/null +++ b/sdks/python/apache_beam/runners/template_runner_test.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for templated pipelines.""" + +from __future__ import absolute_import + +import json +import unittest +import tempfile + +import apache_beam as beam +from apache_beam.pipeline import Pipeline +from apache_beam.runners.dataflow_runner import DataflowPipelineRunner +from apache_beam.utils.options import PipelineOptions +from apache_beam.internal import apiclient + + +class TemplatingDataflowPipelineRunnerTest(unittest.TestCase): + """TemplatingDataflow tests.""" + def test_full_completion(self): +dummy_file = tempfile.NamedTemporaryFile() +dummy_dir = tempfile.mkdtemp() + +remote_runner = DataflowPipelineRunner() +pipeline = Pipeline(remote_runner, +options=PipelineOptions([ +'--dataflow_endpoint=ignored', +'--sdk_location=' + dummy_file.name, +'--job_name=test-job', +'--project=test-project', +'--staging_location=' + dummy_dir, +'--temp_location=/dev/null', +'--template_location=' + dummy_file.name, +'--no_auth=True'])) + +pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: disable=expression-not-assigned +pipeline.run() +with open(dummy_file.name) as template_file: + saved_job_dict = json.load(template_file) + self.assertEqual( + saved_job_dict['environment']['sdkPipelineOptions'] + ['options']['project'], 'test-project') + self.assertEqual( + saved_job_dict['environment']['sdkPipelineOptions'] + ['options']['job_name'], 'test-job') + + def test_bad_path(self): +dummy_sdk_file = tempfile.NamedTemporaryFile() +remote_runner = DataflowPipelineRunner() +pipeline = Pipeline(remote_runner, +options=PipelineOptions([ +'--dataflow_endpoint=ignored', +'--sdk_location=' + dummy_sdk_file.name, +'--job_name=test-job', +'--project=test-project', +'--staging_location=ignored', +'--temp_location=/dev/null', +'--template_location=/bad/path', +'--no_auth=True'])) +remote_runner.job = apiclient.Job(pipeline.options) + +with self.assertRaises(IOError): + pipeline.run() + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a02a688/sdks/python/apache_beam/template_runner_test.py
[1/2] incubator-beam git commit: Change export format to AVRO for BQ
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d59bccd82 -> eb98d636e Change export format to AVRO for BQ Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/390fbfdd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/390fbfdd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/390fbfdd Branch: refs/heads/python-sdk Commit: 390fbfddf444df410f7cb61329496f6f24a0532c Parents: d59bccd Author: Sourabh BajajAuthored: Mon Dec 5 14:04:54 2016 -0800 Committer: Sourabh Bajaj Committed: Mon Dec 5 14:04:54 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/390fbfdd/sdks/python/apache_beam/runners/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 8b953b0..a3f7d94 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -520,7 +520,7 @@ class DataflowPipelineRunner(PipelineRunner): elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) elif transform.source.format == 'bigquery': - step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON') + step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO') # TODO(silviuc): Add table validation if transform.source.validate. if transform.source.table_reference is not None: step.add_property(PropertyNames.BIGQUERY_DATASET,
[2/2] incubator-beam git commit: Closes #1510
Closes #1510 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb98d636 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb98d636 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb98d636 Branch: refs/heads/python-sdk Commit: eb98d636ec9bbe11795aa9ee2fea1a8ceddaf794 Parents: d59bccd 390fbfd Author: Robert BradshawAuthored: Mon Dec 5 17:10:34 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 5 17:10:34 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[2/2] incubator-beam git commit: Closes #1509
Closes #1509 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d59bccd8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d59bccd8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d59bccd8 Branch: refs/heads/python-sdk Commit: d59bccd82461d340613a16ab41db2a4cc6e4200b Parents: f7118c8 f1b83f7 Author: Robert BradshawAuthored: Mon Dec 5 13:01:19 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 5 13:01:19 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Add missing job parameter to the submit_job_description.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk f7118c8a5 -> d59bccd82 Add missing job parameter to the submit_job_description. Tested post commit test locally. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1b83f7e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1b83f7e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1b83f7e Branch: refs/heads/python-sdk Commit: f1b83f7e82b56cd36bffbcdd5cc8ab319bf1e9d3 Parents: f7118c8 Author: Ahmet AltayAuthored: Mon Dec 5 12:29:45 2016 -0800 Committer: Ahmet Altay Committed: Mon Dec 5 12:29:45 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1b83f7e/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index a894557..c5f5f70 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -412,7 +412,7 @@ class DataflowApplicationClient(object): self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) if not template_location: - return self.submit_job_description() + return self.submit_job_description(job) else: return None @@ -426,7 +426,7 @@ class DataflowApplicationClient(object): # TODO(silviuc): Remove the debug logging eventually. logging.info('JOB: %s', job) - def submit_job_description(self): + def submit_job_description(self, job): """Creates and excutes a job request.""" request = dataflow.DataflowProjectsJobsCreateRequest() request.projectId = self.google_cloud_options.project
[1/2] incubator-beam git commit: Modify create_job to allow staging the job and not submitting it to the service.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 7c5e4aa66 -> 0d99856f3 Modify create_job to allow staging the job and not submitting it to the service. - Modularize create_job in create job description, stage job, and send for execution. - Modify --dataflow_job_file to stage the job and continue submitting it to the service. - Add --template_location to stage the job but not submit it to the service. - Add tests for both, including making them mutually exclusive (following Java SDK decision). - Add template_runner_test.py with integration tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cfa0ad81 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cfa0ad81 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cfa0ad81 Branch: refs/heads/python-sdk Commit: cfa0ad8136b323bade9de14ea6149e7f74cbd0b4 Parents: 7c5e4aa Author: Maria Garcia HerreroAuthored: Wed Nov 2 09:14:48 2016 -0700 Committer: Robert Bradshaw Committed: Mon Dec 5 11:04:34 2016 -0800 -- sdks/python/apache_beam/examples/wordcount.py | 1 - sdks/python/apache_beam/internal/apiclient.py | 34 +++- .../apache_beam/runners/dataflow_runner.py | 13 ++- sdks/python/apache_beam/template_runner_test.py | 83 sdks/python/apache_beam/utils/options.py| 10 +++ .../apache_beam/utils/pipeline_options_test.py | 13 +++ .../utils/pipeline_options_validator_test.py| 28 +++ 7 files changed, 175 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/examples/wordcount.py -- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 096e508..7f347d8 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -59,7 +59,6 @@ class WordExtractingDoFn(beam.DoFn): def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" - parser = argparse.ArgumentParser() parser.add_argument('--input', dest='input', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 5612631..a894557 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -24,6 +24,7 @@ import os import re import time +from StringIO import StringIO from apitools.base.py import encoding from apitools.base.py import exceptions @@ -42,7 +43,6 @@ from apache_beam.utils.options import DebugOptions from apache_beam.utils.options import GoogleCloudOptions from apache_beam.utils.options import StandardOptions from apache_beam.utils.options import WorkerOptions - from apache_beam.internal.clients import storage import apache_beam.internal.clients.dataflow as dataflow @@ -327,6 +327,9 @@ class Job(object): self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$') self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$') + def json(self): +return encoding.MessageToJson(self.proto) + class DataflowApplicationClient(object): """A Dataflow API client used by application code to create and query jobs.""" @@ -392,8 +395,29 @@ class DataflowApplicationClient(object): # TODO(silviuc): Refactor so that retry logic can be applied. @retry.no_retries # Using no_retries marks this as an integration point. def create_job(self, job): -"""Submits for remote execution a job described by the workflow proto.""" -# Stage job resources and add an environment proto with their paths. +"""Creates a job description. +Additionally, it may stage it and/or submit it for remote execution. +""" +self.create_job_description(job) + +# Stage and submit the job when necessary +dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file +template_location = ( +job.options.view_as(GoogleCloudOptions).template_location) + +job_location = template_location or dataflow_job_file +if job_location: + gcs_or_local_path = os.path.dirname(job_location) + file_name = os.path.basename(job_location) + self.stage_file(gcs_or_local_path, file_name, StringIO(job.json())) + +if not template_location: + return self.submit_job_description() +else: + return None + + def create_job_description(self, job): +"""Creates a job described by the
[2/2] incubator-beam git commit: Closes #1342
Closes #1342 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d99856f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d99856f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d99856f Branch: refs/heads/python-sdk Commit: 0d99856f37d6bca9bb8d676ae36157bd0515a4f2 Parents: 7c5e4aa cfa0ad8 Author: Robert BradshawAuthored: Mon Dec 5 11:04:35 2016 -0800 Committer: Robert Bradshaw Committed: Mon Dec 5 11:04:35 2016 -0800 -- sdks/python/apache_beam/examples/wordcount.py | 1 - sdks/python/apache_beam/internal/apiclient.py | 34 +++- .../apache_beam/runners/dataflow_runner.py | 13 ++- sdks/python/apache_beam/template_runner_test.py | 83 sdks/python/apache_beam/utils/options.py| 10 +++ .../apache_beam/utils/pipeline_options_test.py | 13 +++ .../utils/pipeline_options_validator_test.py| 28 +++ 7 files changed, 175 insertions(+), 7 deletions(-) --
[2/2] incubator-beam git commit: Removing a bug in .travis.yml that makes the build fail.
Removing a bug in .travis.yml that makes the build fail. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90db7908 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90db7908 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90db7908 Branch: refs/heads/python-sdk Commit: 90db7908b807cb752c23c445b220b3d5dd08b36b Parents: 9a175a5 Author: PabloAuthored: Tue Nov 29 13:58:55 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:15:41 2016 -0800 -- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90db7908/.travis.yml -- diff --git a/.travis.yml b/.travis.yml index 3080341..470d2fc 100644 --- a/.travis.yml +++ b/.travis.yml @@ -59,7 +59,7 @@ before_install: install: - if [ ! "$TEST_PYTHON" ]; then travis_retry mvn -B install clean -U -DskipTests=true; fi - if [ "$TEST_PYTHON" ] && pip list | grep tox; then TOX_FILE=`which tox` ; export TOX_HOME=`dirname $TOX_FILE`; fi - - if [ "$TEST_PYTHON" ] && ! pip list | grep tox; then travis_retry pip install tox --user `whoami`; fi + - if [ "$TEST_PYTHON" ] && ! pip list | grep tox; then travis_retry pip install tox --user; fi # Removing this here protects from inadvertent caching - rm -rf "$HOME/.m2/repository/org/apache/beam"
[1/2] incubator-beam git commit: Closes #1456
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 9a175a5fe -> 7c5e4aa66 Closes #1456 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c5e4aa6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c5e4aa6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c5e4aa6 Branch: refs/heads/python-sdk Commit: 7c5e4aa66c3916b98cf7ecf932f11c2b057e1858 Parents: 9a175a5 90db790 Author: Robert BradshawAuthored: Fri Dec 2 22:15:41 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:15:41 2016 -0800 -- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Call from_p12_keyfile() with the correct arguments.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 7c0bf25fa -> 9a175a5fe Call from_p12_keyfile() with the correct arguments. This code path is failing, because a wrong list of arguments is passed. Fixing that uncovered that oauth2client depends on pyOpenSSL for this call to work. I did not add this dependency to setup.py because, it does not install cleanly in all environments. As a workaround, users who would like to use this authentication method could first do a 'pip install pyOpenSSL'. I added a test, that skips if 'pyOpenSSL' is not installed. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f23b717e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f23b717e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f23b717e Branch: refs/heads/python-sdk Commit: f23b717e7e433c30c0acee0fea8d179e6343b8b8 Parents: 7c0bf25 Author: Ahmet AltayAuthored: Fri Dec 2 13:38:31 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:14:32 2016 -0800 -- sdks/python/apache_beam/internal/auth.py| 6 +-- sdks/python/apache_beam/internal/auth_test.py | 44 +++ sdks/python/apache_beam/tests/data/README.md| 20 + .../apache_beam/tests/data/privatekey.p12 | Bin 0 -> 2452 bytes sdks/python/setup.py| 2 +- 5 files changed, 68 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f23b717e/sdks/python/apache_beam/internal/auth.py -- diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py index a043fcf..056f40c 100644 --- a/sdks/python/apache_beam/internal/auth.py +++ b/sdks/python/apache_beam/internal/auth.py @@ -133,6 +133,7 @@ def get_service_credentials(): 'https://www.googleapis.com/auth/datastore' ] +# TODO(BEAM-1068): Do not recreate options from sys.argv. # We are currently being run from the command line. google_cloud_options = PipelineOptions( sys.argv).view_as(GoogleCloudOptions) @@ -151,8 +152,8 @@ def get_service_credentials(): return ServiceAccountCredentials.from_p12_keyfile( google_cloud_options.service_account_name, google_cloud_options.service_account_key_file, -client_scopes, -user_agent=user_agent) +private_key_password=None, +scopes=client_scopes) except ImportError: with file(google_cloud_options.service_account_key_file) as f: service_account_key = f.read() @@ -162,7 +163,6 @@ def get_service_credentials(): service_account_key, client_scopes, user_agent=user_agent) - else: try: credentials = _GCloudWrapperCredentials(user_agent) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f23b717e/sdks/python/apache_beam/internal/auth_test.py -- diff --git a/sdks/python/apache_beam/internal/auth_test.py b/sdks/python/apache_beam/internal/auth_test.py new file mode 100644 index 000..dfd408e --- /dev/null +++ b/sdks/python/apache_beam/internal/auth_test.py @@ -0,0 +1,44 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Unit tests for the auth module.""" + +import os +import sys +import unittest + +import mock + +from apache_beam.internal import auth + + +class AuthTest(unittest.TestCase): + + def test_create_application_client(self): +try: + test_args = [ + 'test', '--service_account_name', 'abc', '--service_account_key_file', + os.path.join( + os.path.dirname(__file__), '..', 'tests/data/privatekey.p12')] + with mock.patch.object(sys, 'argv', test_args): +credentials = auth.get_service_credentials() +self.assertIsNotNone(credentials) +except NotImplementedError: +
[2/2] incubator-beam git commit: Closes #1491
Closes #1491 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a175a5f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a175a5f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a175a5f Branch: refs/heads/python-sdk Commit: 9a175a5fe17d087f2c3ca0ff8e6d53faad6beab4 Parents: 7c0bf25 f23b717 Author: Robert BradshawAuthored: Fri Dec 2 22:14:33 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:14:33 2016 -0800 -- sdks/python/apache_beam/internal/auth.py| 6 +-- sdks/python/apache_beam/internal/auth_test.py | 44 +++ sdks/python/apache_beam/tests/data/README.md| 20 + .../apache_beam/tests/data/privatekey.p12 | Bin 0 -> 2452 bytes sdks/python/setup.py| 2 +- 5 files changed, 68 insertions(+), 4 deletions(-) --
[1/2] incubator-beam git commit: Add labels to lambdas in write finalization
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 72fa21f98 -> 7c0bf25fa Add labels to lambdas in write finalization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51e97d4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51e97d4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51e97d4a Branch: refs/heads/python-sdk Commit: 51e97d4a8d3f25608b6ee80f57c973186798d54f Parents: 72fa21f Author: Charles ChenAuthored: Fri Dec 2 15:17:55 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:12:35 2016 -0800 -- sdks/python/apache_beam/io/iobase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51e97d4a/sdks/python/apache_beam/io/iobase.py -- diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py index b7cac3e..fd6ae57 100644 --- a/sdks/python/apache_beam/io/iobase.py +++ b/sdks/python/apache_beam/io/iobase.py @@ -767,10 +767,10 @@ class WriteImpl(ptransform.PTransform): write_result_coll = (pcoll | core.ParDo('write_bundles', _WriteBundleDoFn(), self.sink, AsSingleton(init_result_coll)) - | core.Map(lambda x: (None, x)) + | core.Map('pair', lambda x: (None, x)) | core.WindowInto(window.GlobalWindows()) | core.GroupByKey() - | core.FlatMap(lambda x: x[1])) + | core.FlatMap('extract', lambda x: x[1])) return do_once | core.FlatMap( 'finalize_write', _finalize_write,
[2/2] incubator-beam git commit: Closes #1496
Closes #1496 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c0bf25f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c0bf25f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c0bf25f Branch: refs/heads/python-sdk Commit: 7c0bf25fadbe2e74ab62c87c90111b8b7c34e297 Parents: 72fa21f 51e97d4 Author: Robert BradshawAuthored: Fri Dec 2 22:12:36 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:12:36 2016 -0800 -- sdks/python/apache_beam/io/iobase.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Make the legacy SQL flag consistent between Java and Python
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 8365b6838 -> 72fa21f98 Make the legacy SQL flag consistent between Java and Python Renamed the BigQuery use_legacy_sql parameter to use_standard_sql. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72721031 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72721031 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72721031 Branch: refs/heads/python-sdk Commit: 727210318404a585bb7742591ade0a09ccc20444 Parents: 8365b68 Author: Sourabh BajajAuthored: Fri Dec 2 16:45:19 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:10:21 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/io/bigquery.py| 9 + sdks/python/apache_beam/io/bigquery_test.py | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 580a3d7..6dcf05e 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -962,7 +962,7 @@ def model_bigqueryio(): 'ReadYearAndTemp', beam.io.BigQuerySource( query='SELECT year, mean_temp FROM `samples.weather_stations`', - use_legacy_sql=False)) + use_standard_sql=True)) # [END model_bigqueryio_query_standard_sql] # [START model_bigqueryio_schema] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 0885e3a..ce75e10 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -323,7 +323,7 @@ class BigQuerySource(dataflow_io.NativeSource): """A source based on a BigQuery table.""" def __init__(self, table=None, dataset=None, project=None, query=None, - validate=False, coder=None, use_legacy_sql=True): + validate=False, coder=None, use_standard_sql=False): """Initialize a BigQuerySource. Args: @@ -351,8 +351,8 @@ class BigQuerySource(dataflow_io.NativeSource): in a file as a JSON serialized dictionary. This argument needs a value only in special cases when returning table rows as dictionaries is not desirable. - useLegacySql: Specifies whether to use BigQuery's legacy -SQL dialect for this query. The default value is true. If set to false, + use_standard_sql: Specifies whether to use BigQuery's standard +SQL dialect for this query. The default value is False. If set to True, the query will use BigQuery's updated SQL dialect with improved standards compliance. This parameter is ignored for table inputs. @@ -374,7 +374,8 @@ class BigQuerySource(dataflow_io.NativeSource): self.use_legacy_sql = True else: self.query = query - self.use_legacy_sql = use_legacy_sql + # TODO(BEAM-1082): Change the internal flag to be standard_sql + self.use_legacy_sql = not use_standard_sql self.table_reference = None self.validate = validate http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index e263e13..a2cf947 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -199,7 +199,7 @@ class TestBigQuerySource(unittest.TestCase): hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_specify_query_sql_format(self): -source = beam.io.BigQuerySource(query='my_query', use_legacy_sql=False) +source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True) self.assertEqual(source.query, 'my_query') self.assertFalse(source.use_legacy_sql) @@ -371,7 +371,7 @@ class TestBigQueryReader(unittest.TestCase): jobComplete=True, rows=table_rows, schema=schema) actual_rows = [] with beam.io.BigQuerySource( -query='query', use_legacy_sql=False).reader(client) as reader: +query='query', use_standard_sql=True).reader(client) as reader: for
[2/2] incubator-beam git commit: Closes #1497
Closes #1497 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72fa21f9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72fa21f9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72fa21f9 Branch: refs/heads/python-sdk Commit: 72fa21f98527e57cd5c7fad3977c95d7c994325e Parents: 8365b68 7272103 Author: Robert BradshawAuthored: Fri Dec 2 22:10:22 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:10:22 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/io/bigquery.py| 9 + sdks/python/apache_beam/io/bigquery_test.py | 4 ++-- 3 files changed, 8 insertions(+), 7 deletions(-) --
[2/2] incubator-beam git commit: Closes #1494
Closes #1494 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8365b683 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8365b683 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8365b683 Branch: refs/heads/python-sdk Commit: 8365b6838eda6dcef39097ab19b85b9af270914f Parents: fd6a52c 16ffdb2 Author: Robert BradshawAuthored: Fri Dec 2 22:06:42 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:06:42 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/internal/apiclient_test.py | 1 + sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 10 +++--- sdks/python/apache_beam/io/datastore/v1/helper.py | 4 +++- 4 files changed, 8 insertions(+), 9 deletions(-) --
[1/2] incubator-beam git commit: Fix auth related unit test failures
Repository: incubator-beam Updated Branches: refs/heads/python-sdk fd6a52c15 -> 8365b6838 Fix auth related unit test failures Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/16ffdb25 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/16ffdb25 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/16ffdb25 Branch: refs/heads/python-sdk Commit: 16ffdb25f6029c4bee71f035d8d9747f6330ec9f Parents: fd6a52c Author: Vikas KedigehalliAuthored: Fri Dec 2 14:13:31 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 22:06:41 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 2 +- sdks/python/apache_beam/internal/apiclient_test.py | 1 + sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 10 +++--- sdks/python/apache_beam/io/datastore/v1/helper.py | 4 +++- 4 files changed, 8 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index c2a047f..580a3d7 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -919,7 +919,7 @@ def model_datastoreio(): # [START model_datastoreio_write] p = beam.Pipeline(options=PipelineOptions()) musicians = p | 'Musicians' >> beam.Create( - ['Mozart', 'Chopin', 'Beethoven', 'Bach']) + ['Mozart', 'Chopin', 'Beethoven', 'Vivaldi']) def to_entity(content): entity = entity_pb2.Entity() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/internal/apiclient_test.py -- diff --git a/sdks/python/apache_beam/internal/apiclient_test.py b/sdks/python/apache_beam/internal/apiclient_test.py index 66cc8db..31b2dad 100644 --- a/sdks/python/apache_beam/internal/apiclient_test.py +++ b/sdks/python/apache_beam/internal/apiclient_test.py @@ -25,6 +25,7 @@ from apache_beam.internal import apiclient class UtilTest(unittest.TestCase): + @unittest.skip("Enable once BEAM-1080 is fixed.") def test_create_application_client(self): pipeline_options = PipelineOptions() apiclient.DataflowApplicationClient( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/io/datastore/v1/datastoreio.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py index 054002f..20466b9 100644 --- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py +++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py @@ -22,7 +22,6 @@ import logging from google.datastore.v1 import datastore_pb2 from googledatastore import helper as datastore_helper -from apache_beam.internal import auth from apache_beam.io.datastore.v1 import helper from apache_beam.io.datastore.v1 import query_splitter from apache_beam.transforms import Create @@ -154,8 +153,7 @@ class ReadFromDatastore(PTransform): self._num_splits = num_splits def start_bundle(self, context): - self._datastore = helper.get_datastore(self._project, - auth.get_service_credentials()) + self._datastore = helper.get_datastore(self._project) def process(self, p_context, *args, **kwargs): # distinct key to be used to group query splits. @@ -210,8 +208,7 @@ class ReadFromDatastore(PTransform): self._datastore = None def start_bundle(self, context): - self._datastore = helper.get_datastore(self._project, - auth.get_service_credentials()) + self._datastore = helper.get_datastore(self._project) def process(self, p_context, *args, **kwargs): query = p_context.element @@ -341,8 +338,7 @@ class _Mutate(PTransform): def start_bundle(self, context): self._mutations = [] - self._datastore = helper.get_datastore(self._project, - auth.get_service_credentials()) + self._datastore = helper.get_datastore(self._project) def process(self, context): self._mutations.append(context.element) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/io/datastore/v1/helper.py -- diff --git
[2/2] incubator-beam git commit: Closes #1481
Closes #1481 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2363ee51 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2363ee51 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2363ee51 Branch: refs/heads/python-sdk Commit: 2363ee510694dab20d925f7d4979fc6dcd495477 Parents: a463f00 557a2f9 Author: Robert BradshawAuthored: Fri Dec 2 11:34:29 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 11:34:29 2016 -0800 -- .../apache_beam/examples/snippets/snippets.py | 41 .../examples/snippets/snippets_test.py | 9 - 2 files changed, 49 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Add snippet for datastoreio
Repository: incubator-beam Updated Branches: refs/heads/python-sdk a463f000e -> 2363ee510 Add snippet for datastoreio Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/557a2f92 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/557a2f92 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/557a2f92 Branch: refs/heads/python-sdk Commit: 557a2f92f67bfc545533fa35852485e9c4c0b785 Parents: a463f00 Author: Vikas KedigehalliAuthored: Thu Dec 1 10:27:05 2016 -0800 Committer: Robert Bradshaw Committed: Fri Dec 2 11:34:28 2016 -0800 -- .../apache_beam/examples/snippets/snippets.py | 41 .../examples/snippets/snippets_test.py | 9 - 2 files changed, 49 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 8d05130..c2a047f 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -891,6 +891,47 @@ def model_textio(renames): p.run() +def model_datastoreio(): + """Using a Read and Write transform to read/write to Cloud Datastore. + + URL: https://cloud.google.com/dataflow/model/datastoreio + """ + + import uuid + from google.datastore.v1 import entity_pb2 + from google.datastore.v1 import query_pb2 + import googledatastore + import apache_beam as beam + from apache_beam.utils.options import PipelineOptions + from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore + from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore + + project = 'my_project' + kind = 'my_kind' + query = query_pb2.Query() + query.kind.add().name = kind + + # [START model_datastoreio_read] + p = beam.Pipeline(options=PipelineOptions()) + entities = p | 'Read From Datastore' >> ReadFromDatastore(project, query) + # [END model_datastoreio_read] + + # [START model_datastoreio_write] + p = beam.Pipeline(options=PipelineOptions()) + musicians = p | 'Musicians' >> beam.Create( + ['Mozart', 'Chopin', 'Beethoven', 'Bach']) + + def to_entity(content): +entity = entity_pb2.Entity() +googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4())) +googledatastore.helper.add_properties(entity, {'content': unicode(content)}) +return entity + + entities = musicians | 'To Entity' >> beam.Map(to_entity) + entities | 'Write To Datastore' >> WriteToDatastore(project) + # [END model_datastoreio_write] + + def model_bigqueryio(): """Using a Read and Write transform to read/write to BigQuery. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets_test.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 72fccb2..09b4ba4 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -470,11 +470,18 @@ class SnippetsTest(unittest.TestCase): ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'], self.get_output(result_path, suffix='.csv')) + def test_model_datastoreio(self): +# We cannot test datastoreio functionality in unit tests therefore we limit +# ourselves to making sure the pipeline containing Datastore read and write +# transforms can be built. +# TODO(vikasrk): Expore using Datastore Emulator. +snippets.model_datastoreio() + def test_model_bigqueryio(self): # We cannot test BigQueryIO functionality in unit tests therefore we limit # ourselves to making sure the pipeline containing BigQuery sources and # sinks can be built. -self.assertEqual(None, snippets.model_bigqueryio()) +snippets.model_bigqueryio() def _run_test_pipeline_for_options(self, fn): temp_path = self.create_temp_file('aa\nbb\ncc')
[2/2] incubator-beam git commit: Closes #1472
Closes #1472 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ded359d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ded359d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ded359d Branch: refs/heads/python-sdk Commit: 9ded359daefc6040d61a1f33c77563474fcb09b6 Parents: 4414e20 dc68365 Author: Robert BradshawAuthored: Thu Dec 1 09:40:32 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:40:32 2016 -0800 -- sdks/python/apache_beam/examples/snippets/snippets.py | 9 + 1 file changed, 9 insertions(+) --
[1/2] incubator-beam git commit: Parse table schema from JSON
Repository: incubator-beam Updated Branches: refs/heads/python-sdk aa9071d56 -> 739a43197 Parse table schema from JSON Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4c2f62b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4c2f62b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4c2f62b Branch: refs/heads/python-sdk Commit: b4c2f62be8a809b666089e7b2fe5dada9cbd6c16 Parents: aa9071d Author: Sourabh BajajAuthored: Wed Nov 30 13:48:28 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:07:27 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 37 sdks/python/apache_beam/io/bigquery_test.py | 22 ++ 2 files changed, 59 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index 8d7892a..0885e3a 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -200,6 +200,43 @@ class TableRowJsonCoder(coders.Coder): f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()]) +def parse_table_schema_from_json(schema_string): + """Parse the Table Schema provided as string. + + Args: +schema_string: String serialized table schema, should be a valid JSON. + + Returns: +A TableSchema of the BigQuery export from either the Query or the Table. + """ + json_schema = json.loads(schema_string) + + def _parse_schema_field(field): +"""Parse a single schema field from dictionary. + +Args: + field: Dictionary object containing serialized schema. + +Returns: + A TableFieldSchema for a single column in BigQuery. +""" +schema = bigquery.TableFieldSchema() +schema.name = field['name'] +schema.type = field['type'] +if 'mode' in field: + schema.mode = field['mode'] +else: + schema.mode = 'NULLABLE' +if 'description' in field: + schema.description = field['description'] +if 'fields' in field: + schema.fields = [_parse_schema_field(x) for x in field['fields']] +return schema + + fields = [_parse_schema_field(f) for f in json_schema['fields']] + return bigquery.TableSchema(fields=fields) + + class BigQueryDisposition(object): """Class holding standard strings used for create and write dispositions.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery_test.py -- diff --git a/sdks/python/apache_beam/io/bigquery_test.py b/sdks/python/apache_beam/io/bigquery_test.py index b0c3bbe..e263e13 100644 --- a/sdks/python/apache_beam/io/bigquery_test.py +++ b/sdks/python/apache_beam/io/bigquery_test.py @@ -32,6 +32,7 @@ from apache_beam.internal.clients import bigquery from apache_beam.internal.json_value import to_json_value from apache_beam.io.bigquery import RowAsDictJsonCoder from apache_beam.io.bigquery import TableRowJsonCoder +from apache_beam.io.bigquery import parse_table_schema_from_json from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.options import PipelineOptions @@ -113,6 +114,27 @@ class TestTableRowJsonCoder(unittest.TestCase): self.json_compliance_exception(float('-inf')) +class TestTableSchemaParser(unittest.TestCase): + def test_parse_table_schema_from_json(self): +string_field = bigquery.TableFieldSchema( +name='s', type='STRING', mode='NULLABLE', description='s description') +number_field = bigquery.TableFieldSchema( +name='n', type='INTEGER', mode='REQUIRED', description='n description') +record_field = bigquery.TableFieldSchema( +name='r', type='RECORD', mode='REQUIRED', description='r description', +fields=[string_field, number_field]) +expected_schema = bigquery.TableSchema(fields=[record_field]) +json_str = json.dumps({'fields': [ +{'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED', + 'description': 'r description', 'fields': [ + {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE', + 'description': 's description'}, + {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED', + 'description': 'n description'}]}]}) +self.assertEqual(parse_table_schema_from_json(json_str), + expected_schema) + + class TestBigQuerySource(unittest.TestCase): def test_display_data_item_on_validate_true(self):
[2/2] incubator-beam git commit: Closes #1468
Closes #1468 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/739a4319 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/739a4319 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/739a4319 Branch: refs/heads/python-sdk Commit: 739a431975120fe267e6b81635ce6e2356bd2895 Parents: aa9071d b4c2f62 Author: Robert BradshawAuthored: Thu Dec 1 09:07:28 2016 -0800 Committer: Robert Bradshaw Committed: Thu Dec 1 09:07:28 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 37 sdks/python/apache_beam/io/bigquery_test.py | 22 ++ 2 files changed, 59 insertions(+) --
[2/2] incubator-beam git commit: Closes #1454
Closes #1454 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa9071d5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa9071d5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa9071d5 Branch: refs/heads/python-sdk Commit: aa9071d56120a1c0f91cc580fb956570113ee104 Parents: 70c1de9 495a2d8 Author: Robert BradshawAuthored: Wed Nov 30 14:08:58 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 30 14:08:58 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 210 +++ .../apache_beam/examples/datastore_wordcount.py | 203 -- sdks/python/setup.py| 2 +- 3 files changed, 211 insertions(+), 204 deletions(-) --
[1/2] incubator-beam git commit: Few datastoreio fixes
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 70c1de9b9 -> aa9071d56 Few datastoreio fixes - pin googledatastore version to 6.4.1 - add num_shards options to datastore wordcount example - move datastore wordcount example to cookbook directory Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/495a2d8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/495a2d8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/495a2d8c Branch: refs/heads/python-sdk Commit: 495a2d8c01949e4248d7e6dc9ad7a04168a292d1 Parents: 70c1de9 Author: Vikas KedigehalliAuthored: Tue Nov 29 10:02:14 2016 -0800 Committer: Vikas Kedigehalli Committed: Wed Nov 30 12:18:53 2016 -0800 -- .../examples/cookbook/datastore_wordcount.py| 210 +++ .../apache_beam/examples/datastore_wordcount.py | 203 -- sdks/python/setup.py| 2 +- 3 files changed, 211 insertions(+), 204 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/495a2d8c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py new file mode 100644 index 000..eb62614 --- /dev/null +++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py @@ -0,0 +1,210 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A word-counting workflow that uses Google Cloud Datastore.""" + +from __future__ import absolute_import + +import argparse +import logging +import re +import uuid + +from google.datastore.v1 import entity_pb2 +from google.datastore.v1 import query_pb2 +from googledatastore import helper as datastore_helper, PropertyFilter + +import apache_beam as beam +from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore +from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore +from apache_beam.utils.options import GoogleCloudOptions +from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.options import SetupOptions + +empty_line_aggregator = beam.Aggregator('emptyLines') +average_word_size_aggregator = beam.Aggregator('averageWordLength', + beam.combiners.MeanCombineFn(), + float) + + +class WordExtractingDoFn(beam.DoFn): + """Parse each line of input text into words.""" + + def process(self, context): +"""Returns an iterator over words in contents of Cloud Datastore entity. +The element is a line of text. If the line is blank, note that, too. +Args: + context: the call-specific context: data and aggregator. +Returns: + The processed element. +""" +content_value = context.element.properties.get('content', None) +text_line = '' +if content_value: + text_line = content_value.string_value + +if not text_line: + context.aggregate_to(empty_line_aggregator, 1) +words = re.findall(r'[A-Za-z\']+', text_line) +for w in words: + context.aggregate_to(average_word_size_aggregator, len(w)) +return words + + +class EntityWrapper(object): + """Create a Cloud Datastore entity from the given string.""" + def __init__(self, namespace, kind, ancestor): +self._namespace = namespace +self._kind = kind +self._ancestor = ancestor + + def make_entity(self, content): +entity = entity_pb2.Entity() +if self._namespace is not None: + entity.key.partition_id.namespace_id = self._namespace + +# All entities created will have the same ancestor +datastore_helper.add_key_path(entity.key, self._kind, self._ancestor, + self._kind, str(uuid.uuid4())) + +datastore_helper.add_properties(entity, {"content": unicode(content)}) +return entity + + +def
[1/2] incubator-beam git commit: Improve GcsIO throughput by 10x
Repository: incubator-beam Updated Branches: refs/heads/python-sdk cce4331dc -> c1440f7aa Improve GcsIO throughput by 10x This change increases the read buffer used from 1M to 16M. Previously, the speed of reading an incompressible file were: (50 MB: 3.17 MB/s, 100 MB: 3.79 MB/s, 200 MB: 4.13 MB/s, 400 MB: 4.24 MB/s). The speed is now improved to: (50 MB: 24.21 MB/s, 100 MB: 42.70 MB/s, 200 MB: 42.89 MB/s, 400 MB: 46.92 MB/s). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4a332d9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4a332d9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4a332d9 Branch: refs/heads/python-sdk Commit: e4a332d9de5eca941e08f23242cd63bb83148312 Parents: cce4331 Author: Charles ChenAuthored: Thu Nov 17 11:46:44 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 21:53:26 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 20 ++-- 1 file changed, 18 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4a332d9/sdks/python/apache_beam/io/gcsio.py -- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 1b08994..4f310be 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -47,7 +47,23 @@ except ImportError: 'Google Cloud Storage I/O not supported for this execution environment ' '(could not import storage API client).') -DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 +# This is the size of each partial-file read operation from GCS. This +# parameter was chosen to give good throughput while keeping memory usage at +# a reasonable level; the following table shows throughput reached when +# reading files of a given size with a chosen buffer size and informed the +# choice of the value, as of 11/2016: +# +# +---++-+-+-+ +# | | 50 MB file | 100 MB file | 200 MB file | 400 MB file | +# +---++-+-+-+ +# | 8 MB buffer | 17.12 MB/s | 22.67 MB/s | 23.81 MB/s | 26.05 MB/s | +# | 16 MB buffer | 24.21 MB/s | 42.70 MB/s | 42.89 MB/s | 46.92 MB/s | +# | 32 MB buffer | 28.53 MB/s | 48.08 MB/s | 54.30 MB/s | 54.65 MB/s | +# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s | 79.13 MB/s | 85.39 MB/s | +# +---++-+-+-+ +DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 + +# This is the size of chunks used when writing to GCS. WRITE_CHUNK_SIZE = 8 * 1024 * 1024 @@ -373,7 +389,7 @@ class GcsBufferedReader(object): # Initialize read buffer state. self.download_stream = StringIO.StringIO() self.downloader = transfer.Download( -self.download_stream, auto_transfer=False) +self.download_stream, auto_transfer=False, chunksize=buffer_size) self.client.objects.Get(get_request, download=self.downloader) self.position = 0 self.buffer = ''
[2/2] incubator-beam git commit: Closes #1379
Closes #1379 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1440f7a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1440f7a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1440f7a Branch: refs/heads/python-sdk Commit: c1440f7aa69f0134d52463c4bcdcabce36b481d7 Parents: cce4331 e4a332d Author: Robert BradshawAuthored: Fri Nov 18 21:53:27 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 21:53:27 2016 -0800 -- sdks/python/apache_beam/io/gcsio.py | 20 ++-- 1 file changed, 18 insertions(+), 2 deletions(-) --
[2/2] incubator-beam git commit: Closes #1380
Closes #1380 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cce4331d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cce4331d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cce4331d Branch: refs/heads/python-sdk Commit: cce4331dc7ed95aa32654e77d2cc170b63437183 Parents: 45b420d 99bcafe Author: Robert BradshawAuthored: Fri Nov 18 13:37:05 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:37:05 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 5 +++ sdks/python/apache_beam/io/fileio_test.py | 47 ++ 2 files changed, 52 insertions(+) --
[1/2] incubator-beam git commit: Fix issue where batch GCS renames were not issued
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 45b420d82 -> cce4331dc Fix issue where batch GCS renames were not issued Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99bcafe7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99bcafe7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99bcafe7 Branch: refs/heads/python-sdk Commit: 99bcafe7a02bbec5222d77abbad24f5eed8a687f Parents: 45b420d Author: Charles ChenAuthored: Thu Nov 17 14:13:56 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:37:04 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 5 +++ sdks/python/apache_beam/io/fileio_test.py | 47 ++ 2 files changed, 52 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 3b67c4f..4d0eea6 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -514,6 +514,7 @@ class ChannelFactory(object): gcs_batches = [] gcs_current_batch = [] for src, dest in src_dest_pairs: + gcs_current_batch.append((src, dest)) if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: gcs_batches.append(gcs_current_batch) gcs_current_batch = [] @@ -893,6 +894,8 @@ class FileSink(iobase.Sink): exception_infos = ChannelFactory.rename_batch(batch) for src, dest, exception in exception_infos: if exception: + logging.warning('Rename not successful: %s -> %s, %s', src, dest, + exception) should_report = True if isinstance(exception, IOError): # May have already been copied. @@ -906,6 +909,8 @@ class FileSink(iobase.Sink): logging.warning(('Exception in _rename_batch. src: %s, ' 'dest: %s, err: %s'), src, dest, exception) exceptions.append(exception) +else: + logging.debug('Rename successful: %s -> %s', src, dest) return exceptions # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio_test.py -- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 63e71e0..9d1e424 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -28,6 +28,7 @@ import unittest import zlib import hamcrest as hc +import mock import apache_beam as beam from apache_beam import coders @@ -881,6 +882,52 @@ class TestFileSink(unittest.TestCase): with self.assertRaises(Exception): list(sink.finalize_write(init_token, [res1, res2])) + @mock.patch('apache_beam.io.fileio.ChannelFactory.rename') + @mock.patch('apache_beam.io.fileio.gcsio') + def test_rename_batch(self, *unused_args): +# Prepare mocks. +gcsio_mock = mock.MagicMock() +fileio.gcsio.GcsIO = lambda: gcsio_mock +fileio.ChannelFactory.rename = mock.MagicMock() +to_rename = [ +('gs://bucket/from1', 'gs://bucket/to1'), +('gs://bucket/from2', 'gs://bucket/to2'), +('/local/from1', '/local/to1'), +('gs://bucket/from3', 'gs://bucket/to3'), +('/local/from2', '/local/to2'), +] +gcsio_mock.copy_batch.side_effect = [[ +('gs://bucket/from1', 'gs://bucket/to1', None), +('gs://bucket/from2', 'gs://bucket/to2', None), +('gs://bucket/from3', 'gs://bucket/to3', None), +]] +gcsio_mock.delete_batch.side_effect = [[ +('gs://bucket/from1', None), +('gs://bucket/from2', None), +('gs://bucket/from3', None), +]] + +# Issue batch rename. +fileio.ChannelFactory.rename_batch(to_rename) + +# Verify mocks. +expected_local_rename_calls = [ +mock.call('/local/from1', '/local/to1'), +mock.call('/local/from2', '/local/to2'), +] +self.assertEqual(fileio.ChannelFactory.rename.call_args_list, + expected_local_rename_calls) +gcsio_mock.copy_batch.assert_called_once_with([ +('gs://bucket/from1', 'gs://bucket/to1'), +('gs://bucket/from2', 'gs://bucket/to2'), +('gs://bucket/from3', 'gs://bucket/to3'), +]) +gcsio_mock.delete_batch.assert_called_once_with([ +'gs://bucket/from1', +'gs://bucket/from2', +'gs://bucket/from3', +]) + if __name__
[1/2] incubator-beam git commit: Remove redundant REQUIRED_PACKAGES
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3a0f01c8e -> 45b420d82 Remove redundant REQUIRED_PACKAGES Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/329be6e9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/329be6e9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/329be6e9 Branch: refs/heads/python-sdk Commit: 329be6e9a16bfe865d61c3d6041ec5fb6707fc6a Parents: 3a0f01c Author: Ahmet AltayAuthored: Thu Nov 17 15:20:08 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:35:47 2016 -0800 -- sdks/python/setup.py | 3 --- 1 file changed, 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/329be6e9/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index b8034af..1299bbf 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -96,9 +96,6 @@ REQUIRED_PACKAGES = [ 'python-gflags>=2.0,<4.0.0', 'pyyaml>=3.10,<4.0.0', ] -REQUIRED_TEST_PACKAGES = [ -'pyhamcrest>=1.9,<2.0', -] REQUIRED_TEST_PACKAGES = [
[2/2] incubator-beam git commit: Closes #1383
Closes #1383 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45b420d8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45b420d8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45b420d8 Branch: refs/heads/python-sdk Commit: 45b420d82aa6f47e3d37f5aa5ba98378cdc01e9c Parents: 3a0f01c 329be6e Author: Robert BradshawAuthored: Fri Nov 18 13:35:48 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:35:48 2016 -0800 -- sdks/python/setup.py | 3 --- 1 file changed, 3 deletions(-) --
[2/2] incubator-beam git commit: Closes #1385
Closes #1385 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a0f01c8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a0f01c8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a0f01c8 Branch: refs/heads/python-sdk Commit: 3a0f01c8edd36fe525b8ad155011dfb759dad2b4 Parents: b83f12b 93c5233 Author: Robert BradshawAuthored: Fri Nov 18 13:33:46 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:33:46 2016 -0800 -- sdks/python/apache_beam/io/filebasedsource.py | 14 ++ .../python/apache_beam/io/filebasedsource_test.py | 18 +- 2 files changed, 27 insertions(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Fixes a couple of issues of FileBasedSource.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk b83f12b9f -> 3a0f01c8e Fixes a couple of issues of FileBasedSource. (1) Updates code so that a user-specified coder properly gets set to sub-sources. (2) Currently each SingleFileSource takes a reference to FileBasedSource while FileBasedSource takes a reference to Concatsource. ConcatSource has a reference to list of SingleFileSources. This results in quadratic space complexity when serializing splits of a FileBasedSource. This CL fixes this issue by making sure that FileBasedSource is cloned before taking a reference to ConcatSource Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93c5233a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93c5233a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93c5233a Branch: refs/heads/python-sdk Commit: 93c5233a1bf28e9b13412b909c2ee877bd6cf635 Parents: b83f12b Author: Chamikara JayalathAuthored: Thu Nov 17 19:18:26 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 18 13:33:33 2016 -0800 -- sdks/python/apache_beam/io/filebasedsource.py | 14 ++ .../python/apache_beam/io/filebasedsource_test.py | 18 +- 2 files changed, 27 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index c7bc27e..7d8f686 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -109,6 +109,12 @@ class FileBasedSource(iobase.BoundedSource): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] sizes = FileBasedSource._estimate_sizes_in_parallel(file_names) + # We create a reference for FileBasedSource that will be serialized along + # with each _SingleFileSource. To prevent this FileBasedSource from having + # a reference to ConcatSource (resulting in quadratic space complexity) + # we clone it here. + file_based_source_ref = pickler.loads(pickler.dumps(self)) + for index, file_name in enumerate(file_names): if sizes[index] == 0: continue # Ignoring empty file. @@ -123,7 +129,7 @@ class FileBasedSource(iobase.BoundedSource): splittable = False single_file_source = _SingleFileSource( -self, file_name, +file_based_source_ref, file_name, 0, sizes[index], min_bundle_size=self._min_bundle_size, @@ -194,9 +200,6 @@ class FileBasedSource(iobase.BoundedSource): return self._get_concat_source().get_range_tracker(start_position, stop_position) - def default_output_coder(self): -return self._get_concat_source().default_output_coder() - def read_records(self, file_name, offset_range_tracker): """Returns a generator of records created by reading file 'file_name'. @@ -315,3 +318,6 @@ class _SingleFileSource(iobase.BoundedSource): def read(self, range_tracker): return self._file_based_source.read_records(self._file_name, range_tracker) + + def default_output_coder(self): +return self._file_based_source.default_output_coder() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource_test.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 7f4d8d3..a455cd3 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -533,6 +533,23 @@ class TestFileBasedSource(unittest.TestCase): assert_that(pcoll, equal_to(lines)) pipeline.run() + def test_splits_get_coder_from_fbs(self): +class DummyCoder(object): + val = 12345 + +class FileBasedSourceWithCoder(LineSource): + + def default_output_coder(self): +return DummyCoder() + +pattern, expected_data = write_pattern([34, 66, 40, 24, 24, 12]) +self.assertEqual(200, len(expected_data)) +fbs = FileBasedSourceWithCoder(pattern) +splits = [split for split in fbs.split(desired_bundle_size=50)] +self.assertTrue(len(splits)) +for split in splits: + self.assertEqual(DummyCoder.val, split.source.default_output_coder().val) + class TestSingleFileSource(unittest.TestCase): @@ -685,7 +702,6 @@ class TestSingleFileSource(unittest.TestCase):
[2/2] incubator-beam git commit: Fix shared state across retry decorated functions
Fix shared state across retry decorated functions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7f689a7 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7f689a7 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7f689a7 Branch: refs/heads/python-sdk Commit: e7f689a72143cc5c821449c862c732f877ca4645 Parents: 115cf33 Author: Sourabh BajajAuthored: Tue Nov 15 15:04:37 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 17 12:21:43 2016 -0800 -- sdks/python/apache_beam/utils/retry.py | 8 ++--- sdks/python/apache_beam/utils/retry_test.py | 42 2 files changed, 45 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7f689a7/sdks/python/apache_beam/utils/retry.py -- diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 1f5af88..b3016fd 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -152,12 +152,10 @@ def with_exponential_backoff( def real_decorator(fun): """The real decorator whose purpose is to return the wrapped function.""" - -retry_intervals = iter( -FuzzedExponentialIntervals( -initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0)) - def wrapper(*args, **kwargs): + retry_intervals = iter( + FuzzedExponentialIntervals( + initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0)) while True: try: return fun(*args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7f689a7/sdks/python/apache_beam/utils/retry_test.py -- diff --git a/sdks/python/apache_beam/utils/retry_test.py b/sdks/python/apache_beam/utils/retry_test.py index 705c555..7570ca0 100644 --- a/sdks/python/apache_beam/utils/retry_test.py +++ b/sdks/python/apache_beam/utils/retry_test.py @@ -164,5 +164,47 @@ class RetryTest(unittest.TestCase): self.assertEqual(func_name, 'transient_failure') +class DummyClass(object): + def __init__(self, results): +self.index = 0 +self.results = results + + @retry.with_exponential_backoff(num_retries=2, initial_delay_secs=0.1) + def func(self): +self.index += 1 +if self.index > len(self.results) or \ +self.results[self.index - 1] == "Error": + raise ValueError("Error") +return self.results[self.index - 1] + + +class RetryStateTest(unittest.TestCase): + """The test_two_failures and test_single_failure would fail if we have + any shared state for the retry decorator. This test tries to prevent a bug we + found where the state in the decorator was shared across objects and retries + were not available correctly. + + The test_call_two_objects would test this inside the same test. + """ + def test_two_failures(self): +dummy = DummyClass(["Error", "Error", "Success"]) +dummy.func() +self.assertEqual(3, dummy.index) + + def test_single_failure(self): +dummy = DummyClass(["Error", "Success"]) +dummy.func() +self.assertEqual(2, dummy.index) + + def test_call_two_objects(self): +dummy = DummyClass(["Error", "Error", "Success"]) +dummy.func() +self.assertEqual(3, dummy.index) + +dummy2 = DummyClass(["Error", "Success"]) +dummy2.func() +self.assertEqual(2, dummy2.index) + + if __name__ == '__main__': unittest.main()
[1/2] incubator-beam git commit: Closes #1365
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 115cf33ed -> b83f12b9f Closes #1365 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b83f12b9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b83f12b9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b83f12b9 Branch: refs/heads/python-sdk Commit: b83f12b9fff05755323afd655966bf7c3ee03334 Parents: 115cf33 e7f689a Author: Robert BradshawAuthored: Thu Nov 17 12:21:43 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 17 12:21:43 2016 -0800 -- sdks/python/apache_beam/utils/retry.py | 8 ++--- sdks/python/apache_beam/utils/retry_test.py | 42 2 files changed, 45 insertions(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Upgrade Datastore version
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 21b7844bb -> 115cf33ed Upgrade Datastore version Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/249c9f4c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/249c9f4c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/249c9f4c Branch: refs/heads/python-sdk Commit: 249c9f4cdfae17b1f760cbdeb2a7446439fd Parents: 21b7844 Author: Vikas KedigehalliAuthored: Thu Nov 17 10:43:35 2016 -0800 Committer: Vikas Kedigehalli Committed: Thu Nov 17 10:43:35 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/249c9f4c/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index af59069..b8034af 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -87,7 +87,7 @@ REQUIRED_PACKAGES = [ 'avro>=1.7.7,<2.0.0', 'dill>=0.2.5,<0.3', 'google-apitools>=0.5.2,<1.0.0', -'googledatastore==6.3.0', +'googledatastore==6.4.0', 'httplib2>=0.8,<0.10', 'mock>=1.0.1,<3.0.0', 'oauth2client>=2.0.1,<4.0.0',
[2/2] incubator-beam git commit: Closes #1375
Closes #1375 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/115cf33e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/115cf33e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/115cf33e Branch: refs/heads/python-sdk Commit: 115cf33ede4f6e0267a9ec8757620efb51ceb904 Parents: 21b7844 249c9f4 Author: Robert BradshawAuthored: Thu Nov 17 12:20:41 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 17 12:20:41 2016 -0800 -- sdks/python/setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Query Splitter for Datastore v1
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d1fccbf5e -> 21b7844bb Query Splitter for Datastore v1 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1126b70 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1126b70 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1126b70 Branch: refs/heads/python-sdk Commit: c1126b708469fc63bd8ab8e54026700408ec34da Parents: d1fccbf Author: Vikas KedigehalliAuthored: Mon Oct 24 18:29:29 2016 -0700 Committer: Robert Bradshaw Committed: Tue Nov 15 14:24:02 2016 -0800 -- .../python/apache_beam/io/datastore/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/helper.py | 84 ++ .../apache_beam/io/datastore/v1/helper_test.py | 124 + .../io/datastore/v1/query_splitter.py | 270 +++ .../io/datastore/v1/query_splitter_test.py | 257 ++ sdks/python/setup.py| 1 + 7 files changed, 768 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/__init__.py -- diff --git a/sdks/python/apache_beam/io/datastore/__init__.py b/sdks/python/apache_beam/io/datastore/__init__.py new file mode 100644 index 000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/__init__.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py b/sdks/python/apache_beam/io/datastore/v1/__init__.py new file mode 100644 index 000..cce3aca --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/__init__.py @@ -0,0 +1,16 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper.py -- diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py b/sdks/python/apache_beam/io/datastore/v1/helper.py new file mode 100644 index 000..626ab35 --- /dev/null +++ b/sdks/python/apache_beam/io/datastore/v1/helper.py @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing
[2/2] incubator-beam git commit: Closes #1310
Closes #1310 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21b7844b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21b7844b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21b7844b Branch: refs/heads/python-sdk Commit: 21b7844bb05e9a86531876cffe8ee776bfaaa1cc Parents: d1fccbf c1126b7 Author: Robert BradshawAuthored: Tue Nov 15 14:24:03 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 14:24:03 2016 -0800 -- .../python/apache_beam/io/datastore/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/__init__.py | 16 ++ .../apache_beam/io/datastore/v1/helper.py | 84 ++ .../apache_beam/io/datastore/v1/helper_test.py | 124 + .../io/datastore/v1/query_splitter.py | 270 +++ .../io/datastore/v1/query_splitter_test.py | 257 ++ sdks/python/setup.py| 1 + 7 files changed, 768 insertions(+) --
[2/3] incubator-beam git commit: Fix merge lint error
Fix merge lint error Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bf25269 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bf25269 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bf25269 Branch: refs/heads/python-sdk Commit: 8bf2526965dd319f654e7f995df940307fb2260f Parents: 9d805ee Author: Robert BradshawAuthored: Tue Nov 15 11:06:27 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 11:06:27 2016 -0800 -- sdks/python/apache_beam/io/textio.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf25269/sdks/python/apache_beam/io/textio.py -- diff --git a/sdks/python/apache_beam/io/textio.py b/sdks/python/apache_beam/io/textio.py index 4e94f87..9c89b68 100644 --- a/sdks/python/apache_beam/io/textio.py +++ b/sdks/python/apache_beam/io/textio.py @@ -244,7 +244,8 @@ class ReadFromText(PTransform): self._strip_trailing_newlines = strip_trailing_newlines self._coder = coder self._source = _TextSource(file_pattern, min_bundle_size, compression_type, - strip_trailing_newlines, coder, validate=validate) + strip_trailing_newlines, coder, + validate=validate) def apply(self, pvalue): return pvalue.pipeline | Read(self._source)
[1/3] incubator-beam git commit: Display Data for: PipelineOptions, combiners, more sources
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 384fb5dc1 -> d1fccbf5e Display Data for: PipelineOptions, combiners, more sources Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d805eec Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d805eec Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d805eec Branch: refs/heads/python-sdk Commit: 9d805eec6b9cedb43b6e79e255483fc8fa6832d1 Parents: 384fb5d Author: PabloAuthored: Wed Nov 9 14:03:03 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 11:02:28 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 18 +++-- .../apache_beam/internal/json_value_test.py | 8 +- sdks/python/apache_beam/io/avroio.py| 20 - sdks/python/apache_beam/io/avroio_test.py | 78 sdks/python/apache_beam/io/fileio.py| 20 - sdks/python/apache_beam/io/fileio_test.py | 40 -- sdks/python/apache_beam/io/iobase.py| 5 +- sdks/python/apache_beam/io/textio.py| 25 +-- sdks/python/apache_beam/io/textio_test.py | 28 +++ sdks/python/apache_beam/pipeline_test.py| 12 +-- sdks/python/apache_beam/transforms/combiners.py | 14 .../apache_beam/transforms/combiners_test.py| 63 sdks/python/apache_beam/transforms/core.py | 16 +++- .../python/apache_beam/transforms/ptransform.py | 9 +++ sdks/python/apache_beam/utils/options.py| 17 - .../apache_beam/utils/pipeline_options_test.py | 46 ++-- sdks/python/setup.py| 3 + 17 files changed, 375 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 5ac9d6e..8992ec3 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -32,6 +32,7 @@ from apache_beam import utils from apache_beam.internal.auth import get_service_credentials from apache_beam.internal.json_value import to_json_value from apache_beam.transforms import cy_combiners +from apache_beam.transforms.display import DisplayData from apache_beam.utils import dependency from apache_beam.utils import retry from apache_beam.utils.dependency import get_required_container_version @@ -234,11 +235,18 @@ class Environment(object): self.proto.sdkPipelineOptions = ( dataflow.Environment.SdkPipelineOptionsValue()) - for k, v in sdk_pipeline_options.iteritems(): -if v is not None: - self.proto.sdkPipelineOptions.additionalProperties.append( - dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( - key=k, value=to_json_value(v))) + options_dict = {k: v + for k, v in sdk_pipeline_options.iteritems() + if v is not None} + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='options', value=to_json_value(options_dict))) + + dd = DisplayData.create_from(options) + items = [item.get_dict() for item in dd.items] + self.proto.sdkPipelineOptions.additionalProperties.append( + dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty( + key='display_data', value=to_json_value(items))) class Job(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/json_value_test.py -- diff --git a/sdks/python/apache_beam/internal/json_value_test.py b/sdks/python/apache_beam/internal/json_value_test.py index cfab293..a4a47b8 100644 --- a/sdks/python/apache_beam/internal/json_value_test.py +++ b/sdks/python/apache_beam/internal/json_value_test.py @@ -76,14 +76,8 @@ class JsonValueTest(unittest.TestCase): self.assertEquals(long(27), from_json_value(to_json_value(long(27 def test_too_long_value(self): -try: +with self.assertRaises(TypeError): to_json_value(long(1 << 64)) -except TypeError as e: - pass -except Exception as e: - self.fail('Unexpected exception raised: {}'.format(e)) -else: - self.fail('TypeError not raised.') if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio.py
[3/3] incubator-beam git commit: Closes #1264
Closes #1264 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d1fccbf5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d1fccbf5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d1fccbf5 Branch: refs/heads/python-sdk Commit: d1fccbf5eb5064a2a1a6831bb523f8cdf705c8d8 Parents: 384fb5d 8bf2526 Author: Robert BradshawAuthored: Tue Nov 15 11:06:50 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 11:06:50 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 18 +++-- .../apache_beam/internal/json_value_test.py | 8 +- sdks/python/apache_beam/io/avroio.py| 20 - sdks/python/apache_beam/io/avroio_test.py | 78 sdks/python/apache_beam/io/fileio.py| 20 - sdks/python/apache_beam/io/fileio_test.py | 40 -- sdks/python/apache_beam/io/iobase.py| 5 +- sdks/python/apache_beam/io/textio.py| 26 +-- sdks/python/apache_beam/io/textio_test.py | 28 +++ sdks/python/apache_beam/pipeline_test.py| 12 +-- sdks/python/apache_beam/transforms/combiners.py | 14 .../apache_beam/transforms/combiners_test.py| 63 sdks/python/apache_beam/transforms/core.py | 16 +++- .../python/apache_beam/transforms/ptransform.py | 9 +++ sdks/python/apache_beam/utils/options.py| 17 - .../apache_beam/utils/pipeline_options_test.py | 46 ++-- sdks/python/setup.py| 3 + 17 files changed, 376 insertions(+), 47 deletions(-) --
[2/3] incubator-beam git commit: Add a couple of missing coder tests.
Add a couple of missing coder tests. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab02a1d6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab02a1d6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab02a1d6 Branch: refs/heads/python-sdk Commit: ab02a1d60ce70f31c087a78a940bb4833b477ebb Parents: c4208a8 Author: Robert BradshawAuthored: Wed Nov 9 13:47:53 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 10:48:35 2016 -0800 -- .../apache_beam/coders/coders_test_common.py| 24 1 file changed, 19 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab02a1d6/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index adeb6a5..2ec8e7f 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -59,13 +59,9 @@ class CodersTest(unittest.TestCase): 'Base' not in c.__name__) standard -= set([coders.Coder, coders.FastCoder, - coders.Base64PickleCoder, - coders.FloatCoder, coders.ProtoCoder, - coders.TimestampCoder, coders.ToStringCoder, - coders.WindowCoder, - coders.WindowedValueCoder]) + coders.WindowCoder]) assert not standard - cls.seen, standard - cls.seen assert not standard - cls.seen_nested, standard - cls.seen_nested @@ -155,6 +151,9 @@ class CodersTest(unittest.TestCase): self.check_coder(coders.FloatCoder(), *[float(2 ** (0.1 * x)) for x in range(-100, 100)]) self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf')) +self.check_coder( +coders.TupleCoder((coders.FloatCoder(), coders.FloatCoder())), +(0, 1), (-100, 100), (0.5, 0.25)) def test_singleton_coder(self): a = 'anything' @@ -173,6 +172,9 @@ class CodersTest(unittest.TestCase): self.check_coder(coders.TimestampCoder(), timestamp.Timestamp(micros=-1234567890123456789), timestamp.Timestamp(micros=1234567890123456789)) +self.check_coder( +coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())), +(timestamp.Timestamp.of(27), 'abc')) def test_tuple_coder(self): self.check_coder( @@ -209,6 +211,18 @@ class CodersTest(unittest.TestCase): coders.IterableCoder(coders.VarIntCoder(, (1, [1, 2, 3])) + def test_windowed_value_coder(self): +self.check_coder( +coders.WindowedValueCoder(coders.VarIntCoder()), +windowed_value.WindowedValue(3, -100, ()), +windowed_value.WindowedValue(-1, 100, (1, 2, 3))) +self.check_coder( +coders.TupleCoder(( +coders.WindowedValueCoder(coders.FloatCoder()), +coders.WindowedValueCoder(coders.StrUtf8Coder(, +(windowed_value.WindowedValue(1.5, 0, ()), + windowed_value.WindowedValue("abc", 10, ('window', + def test_proto_coder(self): # For instructions on how these test proto message were generated, # see coders_test.py
[GitHub] incubator-beam pull request #1326: Additional Coder tests
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/1326 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #1304
Closes #1304 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4208a89 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4208a89 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4208a89 Branch: refs/heads/python-sdk Commit: c4208a899c38a92acdd95ccf37cb53237a593535 Parents: 6ac6e42 d0e3121 Author: Robert BradshawAuthored: Tue Nov 15 08:53:07 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:53:07 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 1 + sdks/python/apache_beam/utils/names.py | 1 + 2 files changed, 2 insertions(+) --
[1/2] incubator-beam git commit: Allow for passing format so that we can migrate to BQ Avro export later
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6ac6e420f -> c4208a899 Allow for passing format so that we can migrate to BQ Avro export later Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d0e31218 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d0e31218 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d0e31218 Branch: refs/heads/python-sdk Commit: d0e312184e319050baa02abff2c08348b6cfb651 Parents: 6ac6e42 Author: Sourabh BajajAuthored: Mon Nov 7 18:15:17 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:53:06 2016 -0800 -- sdks/python/apache_beam/runners/dataflow_runner.py | 1 + sdks/python/apache_beam/utils/names.py | 1 + 2 files changed, 2 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0e31218/sdks/python/apache_beam/runners/dataflow_runner.py -- diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow_runner.py index 57867fa..00b466b 100644 --- a/sdks/python/apache_beam/runners/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow_runner.py @@ -515,6 +515,7 @@ class DataflowPipelineRunner(PipelineRunner): elif transform.source.format == 'text': step.add_property(PropertyNames.FILE_PATTERN, transform.source.path) elif transform.source.format == 'bigquery': + step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON') # TODO(silviuc): Add table validation if transform.source.validate. if transform.source.table_reference is not None: step.add_property(PropertyNames.BIGQUERY_DATASET, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0e31218/sdks/python/apache_beam/utils/names.py -- diff --git a/sdks/python/apache_beam/utils/names.py b/sdks/python/apache_beam/utils/names.py index be8c92a..3edde3c 100644 --- a/sdks/python/apache_beam/utils/names.py +++ b/sdks/python/apache_beam/utils/names.py @@ -46,6 +46,7 @@ class PropertyNames(object): BIGQUERY_DATASET = 'dataset' BIGQUERY_QUERY = 'bigquery_query' BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql' + BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format' BIGQUERY_TABLE = 'table' BIGQUERY_PROJECT = 'project' BIGQUERY_SCHEMA = 'schema'
[1/2] incubator-beam git commit: Add IP configuration to Python SDK
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 66f324b35 -> 6ac6e420f Add IP configuration to Python SDK Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af6f4e90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af6f4e90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af6f4e90 Branch: refs/heads/python-sdk Commit: af6f4e90a239dbf1a2ad6f0cd8974602dfa9e9b4 Parents: 66f324b Author: Sam McVeetyAuthored: Fri Nov 11 19:56:34 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:50:31 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 9 + sdks/python/apache_beam/utils/options.py | 4 2 files changed, 13 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6f4e90/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 8c7cc29..5ac9d6e 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -210,6 +210,15 @@ class Environment(object): pool.teardownPolicy = ( dataflow.WorkerPool .TeardownPolicyValueValuesEnum.TEARDOWN_ON_SUCCESS) +if self.worker_options.use_public_ips is not None: + if self.worker_options.use_public_ips: +pool.ipConfiguration = ( +dataflow.WorkerPool +.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC) + else: +pool.ipConfiguration = ( +dataflow.WorkerPool +.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE) if self.standard_options.streaming: # Use separate data disk for streaming. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6f4e90/sdks/python/apache_beam/utils/options.py -- diff --git a/sdks/python/apache_beam/utils/options.py b/sdks/python/apache_beam/utils/options.py index ecc85ba..f68335b 100644 --- a/sdks/python/apache_beam/utils/options.py +++ b/sdks/python/apache_beam/utils/options.py @@ -348,6 +348,10 @@ class WorkerOptions(PipelineOptions): help= ('The teardown policy for the VMs. By default this is left unset and ' 'the service sets the default policy.')) +parser.add_argument( +'--use_public_ips', +default=None, +help='Whether to assign public IP addresses to the worker machines.') def validate(self, validator): errors = []
[2/2] incubator-beam git commit: Closes #1354
Closes #1354 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ac6e420 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ac6e420 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ac6e420 Branch: refs/heads/python-sdk Commit: 6ac6e420f4f91e3326b07753004d3a56f34b4226 Parents: 66f324b af6f4e9 Author: Robert BradshawAuthored: Tue Nov 15 08:50:32 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:50:32 2016 -0800 -- sdks/python/apache_beam/internal/apiclient.py | 9 + sdks/python/apache_beam/utils/options.py | 4 2 files changed, 13 insertions(+) --
[1/2] incubator-beam git commit: Use batch GCS operations during FileSink write finalization
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 560fe79f8 -> 66f324b35 Use batch GCS operations during FileSink write finalization Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/313191e1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/313191e1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/313191e1 Branch: refs/heads/python-sdk Commit: 313191e129b884e4e14e9f503a757147d368217c Parents: 560fe79 Author: Charles ChenAuthored: Thu Nov 10 11:54:08 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:48:47 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 177 - sdks/python/apache_beam/io/fileio_test.py | 2 +- sdks/python/apache_beam/io/gcsio.py | 78 +++ sdks/python/apache_beam/io/gcsio_test.py | 103 +- sdks/python/apache_beam/utils/retry.py| 3 + 5 files changed, 298 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 669bfc9..ef20a7c 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -31,6 +31,7 @@ import zlib import weakref from apache_beam import coders +from apache_beam.io import gcsio from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.runners.dataflow.native_io import iobase as dataflow_io @@ -451,8 +452,6 @@ class ChannelFactory(object): 'was %s' % type(compression_type)) if path.startswith('gs://'): - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio raw_file = gcsio.GcsIO().open( path, mode, @@ -470,40 +469,92 @@ class ChannelFactory(object): return isinstance(fileobj, _CompressedFile) @staticmethod - def rename(src, dst): + def rename(src, dest): if src.startswith('gs://'): - assert dst.startswith('gs://'), dst - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.io import gcsio - gcsio.GcsIO().rename(src, dst) + if not dest.startswith('gs://'): +raise ValueError('Destination %r must be GCS path.', dest) + gcsio.GcsIO().rename(src, dest) else: try: -os.rename(src, dst) +os.rename(src, dest) except OSError as err: raise IOError(err) @staticmethod - def copytree(src, dst): + def rename_batch(src_dest_pairs): +# Filter out local and GCS operations. +local_src_dest_pairs = [] +gcs_src_dest_pairs = [] +for src, dest in src_dest_pairs: + if src.startswith('gs://'): +if not dest.startswith('gs://'): + raise ValueError('Destination %r must be GCS path.', dest) +gcs_src_dest_pairs.append((src, dest)) + else: +local_src_dest_pairs.append((src, dest)) + +# Execute local operations. +exceptions = [] +for src, dest in local_src_dest_pairs: + try: +ChannelFactory.rename(src, dest) + except Exception as e: # pylint: disable=broad-except +exceptions.append((src, dest, e)) + +# Execute GCS operations. +exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs) + +return exceptions + + @staticmethod + def _rename_gcs_batch(src_dest_pairs): +# Prepare batches. +gcs_batches = [] +gcs_current_batch = [] +for src, dest in src_dest_pairs: + if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE: +gcs_batches.append(gcs_current_batch) +gcs_current_batch = [] +if gcs_current_batch: + gcs_batches.append(gcs_current_batch) + +# Execute GCS renames if any and return exceptions. +exceptions = [] +for batch in gcs_batches: + copy_statuses = gcsio.GcsIO().copy_batch(batch) + copy_succeeded = [] + for src, dest, exception in copy_statuses: +if exception: + exceptions.append((src, dest, exception)) +else: + copy_succeeded.append((src, dest)) + delete_batch = [src for src, dest in copy_succeeded] + delete_statuses = gcsio.GcsIO().delete_batch(delete_batch) + for i, (src, exception) in enumerate(delete_statuses): +dest = copy_succeeded[i] +if exception: + exceptions.append((src, dest, exception)) +return exceptions + + @staticmethod + def copytree(src, dest): if src.startswith('gs://'): - assert dst.startswith('gs://'), dst + if not dest.startswith('gs://'): +
[2/2] incubator-beam git commit: Closes #1337
Closes #1337 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66f324b3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66f324b3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66f324b3 Branch: refs/heads/python-sdk Commit: 66f324b350c50720cc88357bc2d56e2ecd99adc8 Parents: 560fe79 313191e Author: Robert BradshawAuthored: Tue Nov 15 08:48:48 2016 -0800 Committer: Robert Bradshaw Committed: Tue Nov 15 08:48:48 2016 -0800 -- sdks/python/apache_beam/io/fileio.py | 177 - sdks/python/apache_beam/io/fileio_test.py | 2 +- sdks/python/apache_beam/io/gcsio.py | 78 +++ sdks/python/apache_beam/io/gcsio_test.py | 103 +- sdks/python/apache_beam/utils/retry.py| 3 + 5 files changed, 298 insertions(+), 65 deletions(-) --
[1/2] incubator-beam git commit: [BEAM-852] Add validation to file based sources during create time
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 15e78b28a -> 560fe79f8 [BEAM-852] Add validation to file based sources during create time Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76ad2929 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76ad2929 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76ad2929 Branch: refs/heads/python-sdk Commit: 76ad29296fd57e1eec97bf40d9cf3a1d54a63a3f Parents: 15e78b2 Author: Sourabh BajajAuthored: Mon Nov 14 15:40:10 2016 -0800 Committer: Robert Bradshaw Committed: Mon Nov 14 15:40:10 2016 -0800 -- sdks/python/apache_beam/io/avroio.py| 8 +++- sdks/python/apache_beam/io/bigquery.py | 2 +- sdks/python/apache_beam/io/filebasedsource.py | 16 +++- .../apache_beam/io/filebasedsource_test.py | 41 ++-- sdks/python/apache_beam/io/textio.py| 13 +-- 5 files changed, 60 insertions(+), 20 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/avroio.py -- diff --git a/sdks/python/apache_beam/io/avroio.py b/sdks/python/apache_beam/io/avroio.py index 53ed95a..e7e73dd 100644 --- a/sdks/python/apache_beam/io/avroio.py +++ b/sdks/python/apache_beam/io/avroio.py @@ -37,7 +37,7 @@ __all__ = ['ReadFromAvro', 'WriteToAvro'] class ReadFromAvro(PTransform): """A ``PTransform`` for reading avro files.""" - def __init__(self, file_pattern=None, min_bundle_size=0): + def __init__(self, file_pattern=None, min_bundle_size=0, validate=True): """Initializes ``ReadFromAvro``. Uses source '_AvroSource' to read a set of Avro files defined by a given @@ -70,13 +70,17 @@ class ReadFromAvro(PTransform): file_pattern: the set of files to be read. min_bundle_size: the minimum size in bytes, to be considered when splitting the input into bundles. + validate: flag to verify that the files exist during the pipeline +creation time. **kwargs: Additional keyword arguments to be passed to the base class. """ super(ReadFromAvro, self).__init__() self._args = (file_pattern, min_bundle_size) +self._validate = validate def apply(self, pvalue): -return pvalue.pipeline | Read(_AvroSource(*self._args)) +return pvalue.pipeline | Read(_AvroSource(*self._args, + validate=self._validate)) class _AvroUtils(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/bigquery.py -- diff --git a/sdks/python/apache_beam/io/bigquery.py b/sdks/python/apache_beam/io/bigquery.py index f0e88a6..8d7892a 100644 --- a/sdks/python/apache_beam/io/bigquery.py +++ b/sdks/python/apache_beam/io/bigquery.py @@ -65,7 +65,7 @@ input entails querying the table for all its rows. The coder argument on BigQuerySource controls the reading of the lines in the export files (i.e., transform a JSON object into a PCollection element). The coder is not involved when the same table is read as a side input since there is no intermediate -format involved. We get the table rows directly from the BigQuery service with +format involved. We get the table rows directly from the BigQuery service with a query. Users may provide a query to read from rather than reading all of a BigQuery http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 58ad118..c7bc27e 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -50,7 +50,8 @@ class FileBasedSource(iobase.BoundedSource): file_pattern, min_bundle_size=0, compression_type=fileio.CompressionTypes.AUTO, - splittable=True): + splittable=True, + validate=True): """Initializes ``FileBasedSource``. Args: @@ -68,10 +69,13 @@ class FileBasedSource(iobase.BoundedSource): the file, for example, for compressed files where currently it is not possible to efficiently read a data range without decompressing the whole file. + validate: Boolean flag to verify that the files exist during the pipeline +creation time. Raises: TypeError: when
[2/2] incubator-beam git commit: Closes #1220
Closes #1220 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/560fe79f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/560fe79f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/560fe79f Branch: refs/heads/python-sdk Commit: 560fe79f875b9d08b7256e6bf653a48b19f0ccb5 Parents: 15e78b2 76ad292 Author: Robert BradshawAuthored: Mon Nov 14 15:41:00 2016 -0800 Committer: Robert Bradshaw Committed: Mon Nov 14 15:41:00 2016 -0800 -- sdks/python/apache_beam/io/avroio.py| 8 +++- sdks/python/apache_beam/io/bigquery.py | 2 +- sdks/python/apache_beam/io/filebasedsource.py | 16 +++- .../apache_beam/io/filebasedsource_test.py | 41 ++-- sdks/python/apache_beam/io/textio.py| 13 +-- 5 files changed, 60 insertions(+), 20 deletions(-) --
[2/2] incubator-beam git commit: Closes #1349
Closes #1349 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e78b28 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e78b28 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e78b28 Branch: refs/heads/python-sdk Commit: 15e78b28a63f0987d7e361f5f5b4c9b6be532316 Parents: 6fb4169 1fc9f70 Author: Robert BradshawAuthored: Fri Nov 11 11:51:21 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 11 11:51:21 2016 -0800 -- sdks/python/apache_beam/utils/windowed_value.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Remove the inline from WindowedValue.create()
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 6fb416989 -> 15e78b28a Remove the inline from WindowedValue.create() Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fc9f70b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fc9f70b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fc9f70b Branch: refs/heads/python-sdk Commit: 1fc9f70bfeec18d62a4141a44f3bbd40151efd85 Parents: 6fb4169 Author: Ahmet AltayAuthored: Fri Nov 11 11:43:18 2016 -0800 Committer: Ahmet Altay Committed: Fri Nov 11 11:43:18 2016 -0800 -- sdks/python/apache_beam/utils/windowed_value.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc9f70b/sdks/python/apache_beam/utils/windowed_value.pxd -- diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd b/sdks/python/apache_beam/utils/windowed_value.pxd index 8799914..41c2986 100644 --- a/sdks/python/apache_beam/utils/windowed_value.pxd +++ b/sdks/python/apache_beam/utils/windowed_value.pxd @@ -34,5 +34,5 @@ cdef class WindowedValue(object): cdef inline bint _typed_eq(WindowedValue left, WindowedValue right) except? -2 @cython.locals(wv=WindowedValue) -cdef inline WindowedValue create( +cdef WindowedValue create( object value, int64_t timestamp_micros, object windows)
[1/2] incubator-beam git commit: Closes #1346
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 778194f22 -> 6fb416989 Closes #1346 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fb41698 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fb41698 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fb41698 Branch: refs/heads/python-sdk Commit: 6fb416989128519cb541bced107facae9012f51c Parents: 778194f 6f93cd5 Author: Robert BradshawAuthored: Fri Nov 11 00:23:45 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 11 00:23:45 2016 -0800 -- .../examples/snippets/snippets_test.py | 14 ++--- sdks/python/apache_beam/pipeline_test.py| 5 +++ .../apache_beam/runners/direct/direct_runner.py | 2 +- .../apache_beam/runners/direct/executor.py | 33 .../runners/direct/transform_evaluator.py | 8 +++-- 5 files changed, 41 insertions(+), 21 deletions(-) --
[2/2] incubator-beam git commit: DirectPipelineRunner bug fixes.
DirectPipelineRunner bug fixes. - Execute empty [] | pipelines to the end. - use pickler to serialize/deserialize DoFns instead of deepcopy similar to the othe execution environments. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f93cd58 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f93cd58 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f93cd58 Branch: refs/heads/python-sdk Commit: 6f93cd5884797c0880766c7737e106765becf96d Parents: 778194f Author: Ahmet AltayAuthored: Thu Nov 10 17:37:45 2016 -0800 Committer: Robert Bradshaw Committed: Fri Nov 11 00:23:45 2016 -0800 -- .../examples/snippets/snippets_test.py | 14 ++--- sdks/python/apache_beam/pipeline_test.py| 5 +++ .../apache_beam/runners/direct/direct_runner.py | 2 +- .../apache_beam/runners/direct/executor.py | 33 .../runners/direct/transform_evaluator.py | 8 +++-- 5 files changed, 41 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/examples/snippets/snippets_test.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index edc0a17..72fccb2 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -29,6 +29,8 @@ from apache_beam import io from apache_beam import pvalue from apache_beam import typehints from apache_beam.io import fileio +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import equal_to from apache_beam.utils.options import TypeOptions from apache_beam.examples.snippets import snippets @@ -307,7 +309,9 @@ class TypeHintsTest(unittest.TestCase): # [END type_hints_runtime_on] def test_deterministic_key(self): -lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'] +p = beam.Pipeline('DirectPipelineRunner') +lines = (p | beam.Create( +['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3'])) # [START type_hints_deterministic_key] class Player(object): @@ -338,9 +342,11 @@ class TypeHintsTest(unittest.TestCase): beam.typehints.Tuple[Player, int])) # [END type_hints_deterministic_key] -self.assertEquals( -{('banana', 3), ('kiwi', 4), ('zucchini', 3)}, -set(totals | beam.Map(lambda (k, v): (k.name, v +assert_that( +totals | beam.Map(lambda (k, v): (k.name, v)), +equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)])) + +p.run() class SnippetsTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index a4c983f..013796c 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -24,6 +24,7 @@ from apache_beam.pipeline import Pipeline from apache_beam.pipeline import PipelineOptions from apache_beam.pipeline import PipelineVisitor from apache_beam.runners.dataflow.native_io.iobase import NativeSource +from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Map @@ -217,6 +218,10 @@ class PipelineTest(unittest.TestCase): pipeline.run() + def test_aggregator_empty_input(self): +actual = [] | CombineGlobally(max).without_defaults() +self.assertEqual(actual, []) + def test_pipeline_as_context(self): def raise_exception(exn): raise exn http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/direct_runner.py -- diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py b/sdks/python/apache_beam/runners/direct/direct_runner.py index 2e5fe74..1afd486 100644 --- a/sdks/python/apache_beam/runners/direct/direct_runner.py +++ b/sdks/python/apache_beam/runners/direct/direct_runner.py @@ -125,7 +125,7 @@ class BufferingInMemoryCache(object): for key, value in self._cache.iteritems(): applied_ptransform, tag = key self._pvalue_cache.cache_output(applied_ptransform, tag, value) - self._cache = None +self._cache = None class DirectPipelineResult(PipelineResult):
[2/2] incubator-beam git commit: Closes #1330
Closes #1330 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/778194f2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/778194f2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/778194f2 Branch: refs/heads/python-sdk Commit: 778194f22ae850195c270991499f988f8fe50972 Parents: ec00c53 4827ae8 Author: Robert BradshawAuthored: Thu Nov 10 14:05:02 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 10 14:05:02 2016 -0800 -- sdks/python/run_postcommit.sh | 3 +++ 1 file changed, 3 insertions(+) --
[1/2] incubator-beam git commit: Remove tox cache from previous workspace
Repository: incubator-beam Updated Branches: refs/heads/python-sdk ec00c530c -> 778194f22 Remove tox cache from previous workspace Jenkins doesn't cleanup the previous workspace, and since .tox is in the gitignore, we must explicitly delete it. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4827ae84 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4827ae84 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4827ae84 Branch: refs/heads/python-sdk Commit: 4827ae840797eb43aa7e4265dad7112804b5fb85 Parents: ec00c53 Author: Vikas KedigehalliAuthored: Wed Nov 9 17:25:15 2016 -0800 Committer: Robert Bradshaw Committed: Thu Nov 10 14:05:01 2016 -0800 -- sdks/python/run_postcommit.sh | 3 +++ 1 file changed, 3 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4827ae84/sdks/python/run_postcommit.sh -- diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh index 23dd516..2cd40da 100755 --- a/sdks/python/run_postcommit.sh +++ b/sdks/python/run_postcommit.sh @@ -31,6 +31,9 @@ set -v # pip install --user installation location. LOCAL_PATH=$HOME/.local/bin/ +# Remove any tox cache from previous workspace +rm -rf sdks/python/.tox + # INFRA does not install virtualenv pip install virtualenv --user
[GitHub] incubator-beam pull request #1326: Additional Coder tests
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/1326 Additional Coder tests I noticed these were missing while merging another PR. R: @mariapython You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam coder-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1326.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1326 commit 4f8c3060266fc857908187e365f8a44b49fdee00 Author: Robert Bradshaw <rober...@google.com> Date: 2016-11-09T21:47:53Z Add a couple of missing coder tests. commit 3f8b2c7e52448798c166751cd74be54518650ee9 Author: Robert Bradshaw <rober...@google.com> Date: 2016-11-09T21:56:31Z Also check coder determinism. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #1253: Optimize WindowedValueCoder
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/1253 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #1253
Closes #1253 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec00c530 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec00c530 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec00c530 Branch: refs/heads/python-sdk Commit: ec00c530c9a54ce61095214fcce7b69a7c653d95 Parents: ea64242 0c90fb8 Author: Robert BradshawAuthored: Wed Nov 9 13:26:45 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 9 13:26:45 2016 -0800 -- sdks/python/apache_beam/coders/coder_impl.pxd | 4 sdks/python/apache_beam/coders/coder_impl.py| 21 ++-- .../apache_beam/coders/coders_test_common.py| 8 ++-- 3 files changed, 21 insertions(+), 12 deletions(-) --
[1/2] incubator-beam git commit: Optimize WindowedValueCoder
Repository: incubator-beam Updated Branches: refs/heads/python-sdk ea642428f -> ec00c530c Optimize WindowedValueCoder Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c90fb80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c90fb80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c90fb80 Branch: refs/heads/python-sdk Commit: 0c90fb80f3961848aa82667e8891cfebf4dbc351 Parents: ea64242 Author: Robert Bradshaw <rober...@google.com> Authored: Tue Nov 1 16:12:19 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Nov 9 13:26:44 2016 -0800 -- sdks/python/apache_beam/coders/coder_impl.pxd | 4 sdks/python/apache_beam/coders/coder_impl.py| 21 ++-- .../apache_beam/coders/coders_test_common.py| 8 ++-- 3 files changed, 21 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.pxd -- diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd b/sdks/python/apache_beam/coders/coder_impl.pxd index e021c2e..7ff 100644 --- a/sdks/python/apache_beam/coders/coder_impl.pxd +++ b/sdks/python/apache_beam/coders/coder_impl.pxd @@ -26,6 +26,7 @@ cimport libc.stdlib cimport libc.string from .stream cimport InputStream, OutputStream +from apache_beam.utils cimport windowed_value cdef object loads, dumps, create_InputStream, create_OutputStream, ByteCountingOutputStream, get_varint_size @@ -137,3 +138,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl): @cython.locals(c=CoderImpl) cpdef get_estimated_size_and_observables(self, value, bint nested=?) + + @cython.locals(wv=windowed_value.WindowedValue) + cpdef encode_to_stream(self, value, OutputStream stream, bint nested) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.py -- diff --git a/sdks/python/apache_beam/coders/coder_impl.py b/sdks/python/apache_beam/coders/coder_impl.py index d075814..47a837f 100644 --- a/sdks/python/apache_beam/coders/coder_impl.py +++ b/sdks/python/apache_beam/coders/coder_impl.py @@ -29,7 +29,7 @@ from types import NoneType from apache_beam.coders import observable from apache_beam.utils.timestamp import Timestamp -from apache_beam.utils.windowed_value import WindowedValue +from apache_beam.utils import windowed_value # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports try: @@ -535,19 +535,28 @@ class WindowedValueCoderImpl(StreamCoderImpl): """A coder for windowed values.""" def __init__(self, value_coder, timestamp_coder, window_coder): +# TODO(robertwb): Do we need the ability to customize timestamp_coder? self._value_coder = value_coder self._timestamp_coder = timestamp_coder self._windows_coder = TupleSequenceCoderImpl(window_coder) def encode_to_stream(self, value, out, nested): -self._value_coder.encode_to_stream(value.value, out, True) -self._timestamp_coder.encode_to_stream(value.timestamp, out, True) -self._windows_coder.encode_to_stream(value.windows, out, True) +wv = value # type cast +self._value_coder.encode_to_stream(wv.value, out, True) +if isinstance(self._timestamp_coder, TimestampCoderImpl): + # Avoid creation of Timestamp object. + out.write_bigendian_int64(wv.timestamp_micros) +else: + self._timestamp_coder.encode_to_stream(wv.timestamp, out, True) +self._windows_coder.encode_to_stream(wv.windows, out, True) def decode_from_stream(self, in_stream, nested): -return WindowedValue( +return windowed_value.create( self._value_coder.decode_from_stream(in_stream, True), -self._timestamp_coder.decode_from_stream(in_stream, True), +# Avoid creation of Timestamp object. +in_stream.read_bigendian_int64() +if isinstance(self._timestamp_coder, TimestampCoderImpl) +else self._timestamp_coder.decode_from_stream(in_stream, True).micros, self._windows_coder.decode_from_stream(in_stream, True)) def get_estimated_size_and_observables(self, value, nested=False): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 1af8347..adeb6a5 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_bea
[GitHub] incubator-beam pull request #1323: Testing pr 1212
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/1323 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #1212
Closes #1212 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ea642428 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ea642428 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ea642428 Branch: refs/heads/python-sdk Commit: ea642428f3abf8e074ee52092d10616e8ad0c33a Parents: cf026bb 3c1043a Author: Robert BradshawAuthored: Wed Nov 9 13:24:58 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 9 13:24:58 2016 -0800 -- sdks/python/apache_beam/io/bigquery.py | 32 + sdks/python/apache_beam/io/bigquery_test.py | 68 - sdks/python/apache_beam/io/filebasedsource.py | 6 + .../apache_beam/io/filebasedsource_test.py | 44 -- sdks/python/apache_beam/io/fileio.py| 20 +++ sdks/python/apache_beam/io/fileio_test.py | 62 - sdks/python/apache_beam/io/iobase.py| 12 +- sdks/python/apache_beam/io/pubsub.py| 15 ++ sdks/python/apache_beam/io/pubsub_test.py | 62 + .../runners/dataflow/native_io/iobase.py| 5 +- sdks/python/apache_beam/transforms/core.py | 24 +++- sdks/python/apache_beam/transforms/display.py | 72 +++--- .../apache_beam/transforms/display_test.py | 138 ++- .../apache_beam/transforms/ptransform_test.py | 58 14 files changed, 548 insertions(+), 70 deletions(-) --
[1/2] incubator-beam git commit: Adding display data to sink, sources, and parallel-do operations.
ache_beam/runners/dataflow/native_io/iobase.py -- diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py index 9621f4c..32da3a2 100644 --- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py +++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py @@ -22,6 +22,7 @@ import logging from apache_beam import pvalue from apache_beam.transforms import ptransform +from apache_beam.transforms.display import HasDisplayData def _dict_printable_fields(dict_object, skip_fields): @@ -38,7 +39,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder', 'compression_type'] -class NativeSource(object): +class NativeSource(HasDisplayData): """A source implemented by Dataflow service. This class is to be only inherited by sources natively implemented by Cloud @@ -244,7 +245,7 @@ class DynamicSplitResultWithPosition(DynamicSplitResult): self.stop_position = stop_position -class NativeSink(object): +class NativeSink(HasDisplayData): """A sink implemented by Dataflow service. This class is to be only inherited by sinks natively implemented by Cloud http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 3b5816a..3189de7 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -29,7 +29,7 @@ from apache_beam.coders import typecoders from apache_beam.internal import util from apache_beam.transforms import ptransform from apache_beam.transforms import window -from apache_beam.transforms.display import HasDisplayData +from apache_beam.transforms.display import HasDisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import PTransformWithSideInputs from apache_beam.transforms.window import MIN_TIMESTAMP @@ -235,6 +235,16 @@ class CallableWrapperDoFn(DoFn): super(CallableWrapperDoFn, self).__init__() + def display_data(self): +# If the callable has a name, then it's likely a function, and +# we show its name. +# Otherwise, it might be an instance of a callable class. We +# show its class. +display_data_value = (self._fn.__name__ if hasattr(self._fn, '__name__') + else self._fn.__class__) +return {'fn': DisplayDataItem(display_data_value, + label='Transform Function')} + def __repr__(self): return 'CallableWrapperDoFn(%s)' % self._fn @@ -580,6 +590,11 @@ class ParDo(PTransformWithSideInputs): def process_argspec_fn(self): return self.fn.process_argspec_fn() + def display_data(self): +return {'fn': DisplayDataItem(self.fn.__class__, + label='Transform Function'), +'fn_dd': self.fn} + def apply(self, pcoll): self.side_output_tags = set() # TODO(robertwb): Change all uses of the dofn attribute to use fn instead. @@ -696,6 +711,10 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name else: wrapper = lambda x: [fn(x)] + # TODO. What about callable classes? + if hasattr(fn, '__name__'): +wrapper.__name__ = fn.__name__ + # Proxy the type-hint information from the original function to this new # wrapped function. get_type_hints(wrapper).input_types = get_type_hints(fn).input_types @@ -739,6 +758,9 @@ def Filter(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name % (fn, 'first' if label is None else 'second')) wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else [] + # TODO: What about callable classes? + if hasattr(fn, '__name__'): +wrapper.__name__ = fn.__name__ # Proxy the type-hint information from the function being wrapped, setting the # output type to be the same as the input type. get_type_hints(wrapper).input_types = get_type_hints(fn).input_types http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/display.py -- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index e93d560..365abaf 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -93,6 +93,8 @@ class DisplayData(object): continue if isinstance(element, DisplayDataItem): +if element.should_drop(): + continue element.key = key element.namespace = self.namespace self.items.append(element) @@ -132,6 +134,
[GitHub] incubator-beam pull request #1323: Testing pr 1212
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/1323 Testing pr 1212 You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam github-pr-1212 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1323.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1323 commit 51dc6d70bad0b537738cbee3338a57dbc58fb1bd Author: Robert Bradshaw <rober...@google.com> Date: 2016-11-09T19:49:43Z Closes #1321 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Add hamcrest dependency.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk aa603872c -> cf026bb55 Add hamcrest dependency. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6bc59fd1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6bc59fd1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6bc59fd1 Branch: refs/heads/python-sdk Commit: 6bc59fd12b5e2911f501aefdd8083c6bf65d001b Parents: aa60387 Author: Robert BradshawAuthored: Wed Nov 9 10:51:58 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 9 10:51:58 2016 -0800 -- sdks/python/setup.py | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bc59fd1/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 1aa3eb3..4010b06 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -97,6 +97,10 @@ REQUIRED_PACKAGES = [ ] +REQUIRED_TEST_PACKAGES = [ +'pyhamcrest>=1.9,<2.0', +] + setuptools.setup( name=PACKAGE_NAME, version=PACKAGE_VERSION, @@ -119,6 +123,7 @@ setuptools.setup( setup_requires=['nose>=1.0'], install_requires=REQUIRED_PACKAGES, test_suite='nose.collector', +tests_require=REQUIRED_TEST_PACKAGES, zip_safe=False, # PyPI package information. classifiers=[
[2/2] incubator-beam git commit: Closes #1321
Closes #1321 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf026bb5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf026bb5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf026bb5 Branch: refs/heads/python-sdk Commit: cf026bb55362f2faa843374bcdd67b7e020b8f87 Parents: aa60387 6bc59fd Author: Robert BradshawAuthored: Wed Nov 9 11:49:43 2016 -0800 Committer: Robert Bradshaw Committed: Wed Nov 9 11:49:43 2016 -0800 -- sdks/python/setup.py | 5 + 1 file changed, 5 insertions(+) --
[GitHub] incubator-beam pull request #1321: Add hamcrest dependency.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/1321 Add hamcrest dependency. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam hamcrest Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/1321.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #1321 commit 6bc59fd12b5e2911f501aefdd8083c6bf65d001b Author: Robert Bradshaw <rober...@google.com> Date: 2016-11-09T18:51:58Z Add hamcrest dependency. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[4/5] incubator-beam git commit: Renames InprocessPipelineRunner to DirectPipelineRunner and removes the existing DirectPipelineRunner. Renamed the folder to direct to keep all related files in the sa
# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -#http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""DirectPipelineRunner, executing on the local machine. - -The DirectPipelineRunner class implements what is called in Dataflow -parlance the "direct runner". Such a runner executes the entire graph -of transformations belonging to a pipeline on the local machine. -""" - -from __future__ import absolute_import - -import collections -import itertools -import logging - -from apache_beam import coders -from apache_beam import error -from apache_beam.runners.common import DoFnRunner -from apache_beam.runners.common import DoFnState -from apache_beam.runners.runner import PipelineResult -from apache_beam.runners.runner import PipelineRunner -from apache_beam.runners.runner import PipelineState -from apache_beam.runners.runner import PValueCache -from apache_beam.transforms import sideinputs -from apache_beam.transforms.window import GlobalWindows -from apache_beam.transforms.window import WindowedValue -from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn -from apache_beam.typehints.typecheck import TypeCheckError -from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn -from apache_beam.utils import counters -from apache_beam.utils.options import TypeOptions - - -class DirectPipelineRunner(PipelineRunner): - """A local pipeline runner. - - The runner computes everything locally and does not make any attempt to - optimize for time or space. - """ - - def __init__(self, cache=None): -# Cache of values computed while the runner executes a pipeline. -self._cache = cache if cache is not None else PValueCache() -self._counter_factory = counters.CounterFactory() -# Element counts used for debugging footprint issues in the direct runner. -# The values computed are used only for logging and do not take part in -# any decision making logic. The key for the counter dictionary is either -# the full label for the transform producing the elements or a tuple -# (full label, output tag) for ParDo transforms since they can output values -# on multiple outputs. -self.debug_counters = {} -self.debug_counters['element_counts'] = collections.Counter() - - @property - def cache(self): -return self._cache - - def get_pvalue(self, pvalue): -"""Gets the PValue's computed value from the runner's cache.""" -try: - return self._cache.get_pvalue(pvalue) -except KeyError: - raise error.PValueError('PValue is not computed.') - - def clear_pvalue(self, pvalue): -"""Removes a PValue from the runner's cache.""" -self._cache.clear_pvalue(pvalue) - - def skip_if_cached(func): # pylint: disable=no-self-argument -"""Decorator to skip execution of a transform if value is cached.""" - -def func_wrapper(self, pvalue, *args, **kwargs): - logging.debug('Current: Debug counters: %s', self.debug_counters) - if self._cache.is_cached(pvalue): # pylint: disable=protected-access -return - else: -func(self, pvalue, *args, **kwargs) -return func_wrapper - - def run(self, pipeline): -super(DirectPipelineRunner, self).run(pipeline) -logging.info('Final: Debug counters: %s', self.debug_counters) -return DirectPipelineResult(state=PipelineState.DONE, -counter_factory=self._counter_factory) - - @skip_if_cached - def run_CreatePCollectionView(self, transform_node): -transform = transform_node.transform -view = transform.view -values = self._cache.get_pvalue(transform_node.inputs[0]) - result = sideinputs.SideInputMap(type(view), view._view_options(), values) -self._cache.cache_output(transform_node, result) - - @skip_if_cached - def run_ParDo(self, transform_node): -transform = transform_node.transform - -side_inputs = [self._cache.get_pvalue(view) - for view in transform_node.side_inputs] - -# TODO(robertwb): Do this type checking inside DoFnRunner to get it on -# remote workers as well? -options = transform_node.inputs[0].pipeline.options -if options is not None and options.view_as(TypeOptions).runtime_type_check: - transform.dofn = TypeCheckWrapperDoFn( - transform.dofn, transform.get_type_hints()) - -# TODO(robertwb): Should this be conditionally done on the workers as well? -transform.dofn = OutputChec
[2/5] incubator-beam git commit: Add a test to check memory consumption of the direct runner.
Add a test to check memory consumption of the direct runner. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d021e9ce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d021e9ce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d021e9ce Branch: refs/heads/python-sdk Commit: d021e9ce7bccb770e187c720bd7a95bb99f1bc8e Parents: 7f201cb Author: Ahmet AltayAuthored: Thu Nov 3 15:37:49 2016 -0700 Committer: Robert Bradshaw Committed: Mon Nov 7 17:56:49 2016 -0800 -- sdks/python/apache_beam/pipeline_test.py | 37 +++ 1 file changed, 37 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d021e9ce/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index db3ad9e..a4c983f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -180,6 +180,43 @@ class PipelineTest(unittest.TestCase): ['a-x', 'b-x', 'c-x'], sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x'))) + def test_memory_usage(self): +try: + import resource +except ImportError: + # Skip the test if resource module is not available (e.g. non-Unix os). + self.skipTest('resource module not available.') + +def get_memory_usage_in_bytes(): + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * (2 ** 10) + +def check_memory(value, memory_threshold): + memory_usage = get_memory_usage_in_bytes() + if memory_usage > memory_threshold: +raise RuntimeError( +'High memory usage: %d > %d' % (memory_usage, memory_threshold)) + return value + +len_elements = 100 +num_elements = 10 +num_maps = 100 + +pipeline = Pipeline('DirectPipelineRunner') + +# Consumed memory should not be proportional to the number of maps. +memory_threshold = ( +get_memory_usage_in_bytes() + (3 * len_elements * num_elements)) + +biglist = pipeline | 'oom:create' >> Create( +['x' * len_elements] * num_elements) +for i in range(num_maps): + biglist = biglist | ('oom:addone-%d' % i) >> Map(lambda x: x + 'y') +result = biglist | 'oom:check' >> Map(check_memory, memory_threshold) +assert_that(result, equal_to( +['x' * len_elements + 'y' * num_maps] * num_elements)) + +pipeline.run() + def test_pipeline_as_context(self): def raise_exception(exn): raise exn