[1/2] incubator-beam git commit: Add support for date partitioned table names

2016-12-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk bb09c07b6 -> 409d067b3


Add support for date partitioned table names

These names have the format "tablename$mmdd".  Previously the dollar
sign caused this to be deemed invalid.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1af871a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1af871a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1af871a

Branch: refs/heads/python-sdk
Commit: a1af871a0c8c92a6d84f2e9950615f7737118d7e
Parents: bb09c07
Author: Kevin Graney 
Authored: Tue Dec 6 15:09:42 2016 -0500
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:16:45 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  | 6 --
 sdks/python/apache_beam/io/bigquery_test.py | 8 
 2 files changed, 12 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery.py
--
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index ce75e10..2059de4 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -275,7 +275,9 @@ def _parse_table_reference(table, dataset=None, 
project=None):
   then the table argument must contain the entire table reference:
   'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. This argument can be a
   bigquery.TableReference instance in which case dataset and project are
-  ignored and the reference is returned as a result.
+  ignored and the reference is returned as a result.  Additionally, for 
date
+  partitioned tables, appending '$mmdd' to the table name is supported,
+  e.g. 'DATASET.TABLE$mmdd'.
 dataset: The ID of the dataset containing this table or null if the table
   reference is specified entirely by the table argument.
 project: The ID of the project containing this table or null if the table
@@ -300,7 +302,7 @@ def _parse_table_reference(table, dataset=None, 
project=None):
   # table name.
   if dataset is None:
 match = re.match(
-r'^((?P.+):)?(?P\w+)\.(?P\w+)$', table)
+r'^((?P.+):)?(?P\w+)\.(?P[\w\$]+)$', table)
 if not match:
   raise ValueError(
   'Expected a table reference (PROJECT:DATASET.TABLE or '

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1af871a/sdks/python/apache_beam/io/bigquery_test.py
--
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index a2cf947..f6f9363 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -208,6 +208,14 @@ class TestBigQuerySource(unittest.TestCase):
 self.assertEqual(source.query, 'my_query')
 self.assertIsNone(source.table_reference)
 
+  def test_date_partitioned_table_name(self):
+source = beam.io.BigQuerySource('dataset.table$20030102', validate=True)
+dd = DisplayData.create_from(source)
+expected_items = [
+DisplayDataItemMatcher('validation', True),
+DisplayDataItemMatcher('table', 'dataset.table$20030102')]
+hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
+
 
 class TestBigQuerySink(unittest.TestCase):
 



[2/2] incubator-beam git commit: Closes #1534

2016-12-21 Thread robertwb
Closes #1534


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/409d067b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/409d067b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/409d067b

Branch: refs/heads/python-sdk
Commit: 409d067b36036981e330a055b652bb74a93f4ca2
Parents: bb09c07 a1af871
Author: Robert Bradshaw 
Authored: Wed Dec 21 15:16:46 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:16:46 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  | 6 --
 sdks/python/apache_beam/io/bigquery_test.py | 8 
 2 files changed, 12 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Fixing inconsistencies in PipelineOptions

2016-12-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3454d691f -> bb09c07b6


Fixing inconsistencies in PipelineOptions

The following options have changed:

* job_name - Default is 'beamapp-username-date-microseconds'. Test was added.
* staging_location and temp_location - staging_location was the default of
  temp_location. Now it's the other way around, and the tests reflect that.
* machine_type alias of worker_machine_type has been removed.
* disk_type alias of worker_disk_type has been removed.
* disk_source_image option has been removed.
* no_save_main_session option has been removed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35e2fdc7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35e2fdc7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35e2fdc7

Branch: refs/heads/python-sdk
Commit: 35e2fdc7f22f20d74a569e86ced931209661dec1
Parents: 3454d69
Author: Pablo 
Authored: Tue Dec 6 18:01:54 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:14:52 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py   | 45 
 .../apache_beam/internal/apiclient_test.py  |  6 +++
 sdks/python/apache_beam/utils/options.py| 33 ++
 .../utils/pipeline_options_validator.py | 11 ++---
 .../utils/pipeline_options_validator_test.py|  8 ++--
 5 files changed, 54 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35e2fdc7/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index f1341a7..3a9ba46 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -18,6 +18,8 @@
 """Dataflow client utility functions."""
 
 import codecs
+from datetime import datetime
+import getpass
 import json
 import logging
 import os
@@ -46,10 +48,6 @@ from apache_beam.utils.options import WorkerOptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
-BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
-COMPUTE_API_SERVICE = 'compute.googleapis.com'
-STORAGE_API_SERVICE = 'storage.googleapis.com'
-
 
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
@@ -121,11 +119,13 @@ class Environment(object):
 self.worker_options = options.view_as(WorkerOptions)
 self.debug_options = options.view_as(DebugOptions)
 self.proto = dataflow.Environment()
-self.proto.clusterManagerApiService = COMPUTE_API_SERVICE
-self.proto.dataset = '%s/cloud_dataflow' % BIGQUERY_API_SERVICE
+self.proto.clusterManagerApiService = 
GoogleCloudOptions.COMPUTE_API_SERVICE
+self.proto.dataset = '{}/cloud_dataflow'.format(
+GoogleCloudOptions.BIGQUERY_API_SERVICE)
 self.proto.tempStoragePrefix = (
-self.google_cloud_options.temp_location.replace('gs:/',
-STORAGE_API_SERVICE))
+self.google_cloud_options.temp_location.replace(
+'gs:/',
+GoogleCloudOptions.STORAGE_API_SERVICE))
 # User agent information.
 self.proto.userAgent = dataflow.Environment.UserAgentValue()
 self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
@@ -165,7 +165,7 @@ class Environment(object):
   dataflow.Package(
   location='%s/%s' % (
   self.google_cloud_options.staging_location.replace(
-  'gs:/', STORAGE_API_SERVICE),
+  'gs:/', GoogleCloudOptions.STORAGE_API_SERVICE),
   package),
   name=package))
 
@@ -174,7 +174,7 @@ class Environment(object):
 packages=package_descriptors,
 taskrunnerSettings=dataflow.TaskRunnerSettings(
 parallelWorkerSettings=dataflow.WorkerSettings(
-baseUrl='https://dataflow.googleapis.com',
+baseUrl=GoogleCloudOptions.DATAFLOW_ENDPOINT,
 servicePath=self.google_cloud_options.dataflow_endpoint)))
 pool.autoscalingSettings = dataflow.AutoscalingSettings()
 # Set worker pool options received through command line.
@@ -195,8 +195,6 @@ class Environment(object):
   pool.diskSizeGb = self.worker_options.disk_size_gb
 if self.worker_options.disk_type:
   pool.diskType = self.worker_options.disk_type
-if self.worker_options.disk_source_image:
-  pool.diskSourceImage = self.worker_options.disk_source_image
 if self.worker_options.zone:
   pool.zone = self.worker_options.zone
 if 

[2/2] incubator-beam git commit: Closes #1526

2016-12-21 Thread robertwb
Closes #1526


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/bb09c07b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/bb09c07b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/bb09c07b

Branch: refs/heads/python-sdk
Commit: bb09c07b6351dcc53c0bdc8bf1259261ad2edfba
Parents: 3454d69 35e2fdc
Author: Robert Bradshaw 
Authored: Wed Dec 21 15:15:20 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 21 15:15:20 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py   | 45 
 .../apache_beam/internal/apiclient_test.py  |  6 +++
 sdks/python/apache_beam/utils/options.py| 33 ++
 .../utils/pipeline_options_validator.py | 11 ++---
 .../utils/pipeline_options_validator_test.py|  8 ++--
 5 files changed, 54 insertions(+), 49 deletions(-)
--




[1/2] incubator-beam git commit: Update Apitools to version 0.5.6

2016-12-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e26527873 -> 3b4fd5c7d


Update Apitools to version 0.5.6

This brings in the fix to https://github.com/google/apitools/pull/136
needed for the BigQuery reader.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/63074312
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/63074312
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/63074312

Branch: refs/heads/python-sdk
Commit: 63074312aeb44b5db7f4e914c64864483f6a6510
Parents: e265278
Author: Sourabh Bajaj 
Authored: Sat Dec 3 09:02:32 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 15 17:11:58 2016 -0800

--
 sdks/python/setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/63074312/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 033afc7..f6357b6 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -86,7 +86,7 @@ else:
 REQUIRED_PACKAGES = [
 'avro>=1.7.7,<2.0.0',
 'dill>=0.2.5,<0.3',
-'google-apitools>=0.5.2,<1.0.0',
+'google-apitools>=0.5.6,<1.0.0',
 'googledatastore==6.4.1',
 'httplib2>=0.8,<0.10',
 'mock>=1.0.1,<3.0.0',



[2/2] incubator-beam git commit: Closes #1501

2016-12-15 Thread robertwb
Closes #1501


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b4fd5c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b4fd5c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b4fd5c7

Branch: refs/heads/python-sdk
Commit: 3b4fd5c7d962987405dc157e6b84788af61f6413
Parents: e265278 6307431
Author: Robert Bradshaw 
Authored: Thu Dec 15 17:12:55 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 15 17:12:55 2016 -0800

--
 sdks/python/setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Rename PTransform.apply() to PTransform.expand()

2016-12-15 Thread robertwb
est_combine_globally_with_default_side_input(self):
 class CombineWithSideInput(PTransform):
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 side = pcoll | CombineGlobally(sum).as_singleton_view()
 main = pcoll.pipeline | Create([None])
 return main | Map(lambda _, s: s, side)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/core.py
--
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 523c5a6..0ba1c62 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -598,7 +598,7 @@ class ParDo(PTransformWithSideInputs):
   label='Transform Function'),
 'fn_dd': self.fn}
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 self.side_output_tags = set()
 # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
 self.dofn = self.fn
@@ -641,7 +641,7 @@ class _MultiParDo(PTransform):
 self._tags = tags
 self._main_tag = main_tag
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 _ = pcoll | self._do_transform
 return pvalue.DoOutputsTuple(
 pcoll.pipeline, self._do_transform, self._tags, self._main_tag)
@@ -854,7 +854,7 @@ class CombineGlobally(PTransform):
   def as_singleton_view(self):
 return self.clone(as_view=True)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 def add_input_types(transform):
   type_hints = self.get_type_hints()
   if type_hints.input_types:
@@ -939,7 +939,7 @@ class CombinePerKey(PTransformWithSideInputs):
   def process_argspec_fn(self):
 return self.fn._fn  # pylint: disable=protected-access
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 args, kwargs = util.insert_values_in_args(
 self.args, self.kwargs, self.side_inputs)
 return pcoll | GroupByKey() | CombineValues('Combine',
@@ -952,7 +952,7 @@ class CombineValues(PTransformWithSideInputs):
   def make_fn(self, fn):
 return fn if isinstance(fn, CombineFn) else CombineFn.from_callable(fn)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 args, kwargs = util.insert_values_in_args(
 self.args, self.kwargs, self.side_inputs)
 
@@ -1083,7 +1083,7 @@ class GroupByKey(PTransform):
   timer_window, name, time_domain, fire_time, state):
 yield wvalue.with_value((k, wvalue.value))
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 # This code path is only used in the local direct runner.  For Dataflow
 # runner execution, the GroupByKey transform is expanded on the service.
 input_type = pcoll.element_type
@@ -1132,7 +1132,7 @@ class GroupByKeyOnly(PTransform):
 key_type, value_type = trivial_inference.key_value_types(input_type)
 return KV[key_type, Iterable[value_type]]
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 self._check_pcollection(pcoll)
 return pvalue.PCollection(pcoll.pipeline)
 
@@ -1170,7 +1170,7 @@ class Partition(PTransformWithSideInputs):
   def make_fn(self, fn):
 return fn if isinstance(fn, PartitionFn) else 
CallableWrapperPartitionFn(fn)
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 n = int(self.args[0])
 return pcoll | ParDo(
 self.ApplyPartitionFnFn(), self.fn, *self.args,
@@ -1261,14 +1261,14 @@ class WindowInto(ParDo):
   def infer_output_type(self, input_type):
 return input_type
 
-  def apply(self, pcoll):
+  def expand(self, pcoll):
 input_type = pcoll.element_type
 
 if input_type is not None:
   output_type = input_type
   self.with_input_types(input_type)
   self.with_output_types(output_type)
-return super(WindowInto, self).apply(pcoll)
+return super(WindowInto, self).expand(pcoll)
 
 
 # Python's pickling is broken for nested classes.
@@ -1305,7 +1305,7 @@ class Flatten(PTransform):
   raise ValueError('Input to Flatten must be an iterable.')
 return pvalueish, pvalueish
 
-  def apply(self, pcolls):
+  def expand(self, pcolls):
 for pcoll in pcolls:
   self._check_pcollection(pcoll)
 return pvalue.PCollection(self.pipeline)
@@ -1345,7 +1345,7 @@ class Create(PTransform):
 else:
   return Union[[trivial_inference.instance_to_type(v) for v in self.value]]
 
-  def apply(self, pbegin):
+  def expand(self, pbegin):
 assert isinstance(pbegin, pvalue.PBegin)
 self.pipeline = pbegin.pipeline
 return pvalue.PCollection(self.pipeline)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e62249a1/sdks/python/apache_beam/transforms/ptransform.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 2212d00..1bd7fb4 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apa

[2/2] incubator-beam git commit: Closes #1634

2016-12-15 Thread robertwb
Closes #1634


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2652787
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2652787
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2652787

Branch: refs/heads/python-sdk
Commit: e2652787355d4c322138f55ae2c54494ec592e59
Parents: d3c8874 e62249a
Author: Robert Bradshaw 
Authored: Thu Dec 15 16:52:39 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 15 16:52:39 2016 -0800

--
 sdks/python/README.md   |  2 +-
 .../examples/complete/autocomplete.py   |  2 +-
 .../examples/complete/estimate_pi.py|  2 +-
 .../apache_beam/examples/complete/tfidf.py  |  2 +-
 .../examples/complete/top_wikipedia_sessions.py |  6 ++---
 .../examples/cookbook/custom_ptransform.py  |  2 +-
 .../examples/cookbook/multiple_output_pardo.py  |  2 +-
 .../apache_beam/examples/snippets/snippets.py   | 16 ++---
 .../examples/snippets/snippets_test.py  |  2 +-
 .../apache_beam/examples/wordcount_debugging.py |  2 +-
 sdks/python/apache_beam/io/avroio.py|  4 ++--
 .../apache_beam/io/datastore/v1/datastoreio.py  |  4 ++--
 sdks/python/apache_beam/io/iobase.py|  6 ++---
 sdks/python/apache_beam/io/textio.py|  4 ++--
 sdks/python/apache_beam/pipeline_test.py|  4 ++--
 .../runners/dataflow/native_io/iobase.py|  2 +-
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 sdks/python/apache_beam/runners/runner.py   |  4 ++--
 sdks/python/apache_beam/transforms/combiners.py | 14 ++--
 .../apache_beam/transforms/combiners_test.py|  2 +-
 sdks/python/apache_beam/transforms/core.py  | 24 ++--
 .../python/apache_beam/transforms/ptransform.py | 10 
 .../apache_beam/transforms/ptransform_test.py   |  6 ++---
 .../python/apache_beam/transforms/sideinputs.py | 10 
 sdks/python/apache_beam/transforms/util.py  |  4 ++--
 .../transforms/write_ptransform_test.py |  2 +-
 .../typehints/typed_pipeline_test.py|  2 +-
 27 files changed, 71 insertions(+), 71 deletions(-)
--




[2/2] incubator-beam git commit: Closes #1617

2016-12-15 Thread robertwb
Closes #1617


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d3c88748
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d3c88748
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d3c88748

Branch: refs/heads/python-sdk
Commit: d3c88748099fcccb27aef67c5c390d0bc67ebeb0
Parents: e383c77 0a558c7
Author: Robert Bradshaw 
Authored: Thu Dec 15 16:35:59 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 15 16:35:59 2016 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Update the BQ export flat from Json to Avro

2016-12-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e383c7715 -> d3c887480


Update the BQ export flat from Json to Avro


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a558c71
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a558c71
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a558c71

Branch: refs/heads/python-sdk
Commit: 0a558c7171d6e4452d88ecffd16a024a19cbfc42
Parents: e383c77
Author: Sourabh Bajaj 
Authored: Wed Dec 14 11:44:46 2016 -0800
Committer: Sourabh Bajaj 
Committed: Wed Dec 14 11:44:46 2016 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a558c71/sdks/python/apache_beam/runners/dataflow_runner.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index 8b953b0..a3f7d94 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -520,7 +520,7 @@ class DataflowPipelineRunner(PipelineRunner):
 elif transform.source.format == 'text':
   step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
 elif transform.source.format == 'bigquery':
-  step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON')
+  step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
   # TODO(silviuc): Add table validation if transform.source.validate.
   if transform.source.table_reference is not None:
 step.add_property(PropertyNames.BIGQUERY_DATASET,



[2/2] incubator-beam git commit: Closes #1591

2016-12-13 Thread robertwb
Closes #1591


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e383c771
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e383c771
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e383c771

Branch: refs/heads/python-sdk
Commit: e383c77151bcfb61e3394c5dd040a425aa246bec
Parents: f086afe e36ee8d
Author: Robert Bradshaw 
Authored: Tue Dec 13 11:09:13 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Dec 13 11:09:13 2016 -0800

--
 sdks/python/apache_beam/runners/direct/transform_evaluator.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Do not test pickling native sink objects

2016-12-13 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk f086afe12 -> e383c7715


Do not test pickling native sink objects


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e36ee8d7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e36ee8d7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e36ee8d7

Branch: refs/heads/python-sdk
Commit: e36ee8d797965e2e2101cce046a14399f2008fc6
Parents: f086afe
Author: Ahmet Altay 
Authored: Mon Dec 12 19:09:37 2016 -0800
Committer: Ahmet Altay 
Committed: Mon Dec 12 19:09:37 2016 -0800

--
 sdks/python/apache_beam/runners/direct/transform_evaluator.py | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e36ee8d7/sdks/python/apache_beam/runners/direct/transform_evaluator.py
--
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py 
b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index 7a9a31f..24ab754 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -505,8 +505,7 @@ class _NativeWriteEvaluator(_TransformEvaluator):
 side_inputs)
 
 assert applied_ptransform.transform.sink
-# TODO(aaltay): Consider storing the serialized form as an optimization.
-self._sink = 
pickler.loads(pickler.dumps(applied_ptransform.transform.sink))
+self._sink = applied_ptransform.transform.sink
 
   @property
   def _is_final_bundle(self):



[1/2] incubator-beam git commit: [BEAM-1124] Temporarily Ignore a ValidatesRunnerTest That Broke Postcommit

2016-12-12 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b265dceaa -> f086afe12


[BEAM-1124] Temporarily Ignore a ValidatesRunnerTest That Broke Postcommit


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0a70e581
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0a70e581
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0a70e581

Branch: refs/heads/python-sdk
Commit: 0a70e581bef4258dfb18f3c5db8f9a9369ab13e8
Parents: b265dce
Author: Mark Liu 
Authored: Fri Dec 9 16:55:45 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Dec 12 14:19:57 2016 -0800

--
 sdks/python/apache_beam/dataflow_test.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0a70e581/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
index ba3553a..f410230 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -176,7 +176,9 @@ class DataflowTest(unittest.TestCase):
 assert_that(result, equal_to([(1, 'empty'), (2, 'empty')]))
 pipeline.run()
 
-  @attr('ValidatesRunner')
+  # @attr('ValidatesRunner')
+  # TODO(BEAM-1124): Temporarily disable it due to test failed running on
+  # Dataflow service.
   def test_multi_valued_singleton_side_input(self):
 pipeline = TestPipeline()
 pcol = pipeline | 'start' >> Create([1, 2])



[2/2] incubator-beam git commit: Closes #1571

2016-12-12 Thread robertwb
Closes #1571


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f086afe1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f086afe1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f086afe1

Branch: refs/heads/python-sdk
Commit: f086afe125cc413e7626ab186268d5bb3c067a89
Parents: b265dce 0a70e58
Author: Robert Bradshaw 
Authored: Mon Dec 12 14:19:58 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Dec 12 14:19:58 2016 -0800

--
 sdks/python/apache_beam/dataflow_test.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Add more documentation to datastore_wordcount example

2016-12-09 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 8eae855d6 -> b265dceaa


Add more documentation to datastore_wordcount example


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/62b8095e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/62b8095e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/62b8095e

Branch: refs/heads/python-sdk
Commit: 62b8095e7164a316b8ae93c7fefa41d38ee255a8
Parents: 8eae855
Author: Vikas Kedigehalli 
Authored: Wed Dec 7 14:14:41 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 9 11:29:02 2016 -0800

--
 .../examples/cookbook/datastore_wordcount.py| 46 +++-
 1 file changed, 44 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/62b8095e/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py 
b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
index eb62614..9613402 100644
--- a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -15,7 +15,49 @@
 # limitations under the License.
 #
 
-"""A word-counting workflow that uses Google Cloud Datastore."""
+"""A word-counting workflow that uses Google Cloud Datastore.
+
+This example shows how to use ``datastoreio`` to read from and write to
+Google Cloud Datastore. Note that running this example may incur charge for
+Cloud Datastore operations.
+
+See https://developers.google.com/datastore/ for more details on Google Cloud
+Datastore.
+See http://beam.incubator.apache.org/get-started/quickstart on
+how to run a Beam pipeline.
+
+Read-only Mode: In this mode, this example reads Cloud Datastore entities using
+the ``datastoreio.ReadFromDatastore`` transform, extracts the words,
+counts them and write the output to a set of files.
+
+The following options must be provided to run this pipeline in read-only mode:
+``
+--project YOUR_PROJECT_ID
+--kind YOUR_DATASTORE_KIND
+--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
+--read-only
+``
+
+Read-write Mode: In this mode, this example reads words from an input file,
+converts them to Cloud Datastore ``Entity`` objects and writes them to
+Cloud Datastore using the ``datastoreio.Write`` transform. The second pipeline
+will then read these Cloud Datastore entities using the
+``datastoreio.ReadFromDatastore`` transform, extract the words, count them and
+write the output to a set of files.
+
+The following options must be provided to run this pipeline in read-write mode:
+``
+--project YOUR_PROJECT_ID
+--kind YOUR_DATASTORE_KIND
+--output [YOUR_LOCAL_FILE *or* gs://YOUR_OUTPUT_PATH]
+``
+
+Note: We are using the Cloud Datastore protobuf objects directly because
+that is the interface that the ``datastoreio`` exposes.
+See the following links on more information about these protobuf messages.
+https://cloud.google.com/datastore/docs/reference/rpc/google.datastore.v1 and
+https://github.com/googleapis/googleapis/tree/master/google/datastore/v1
+"""
 
 from __future__ import absolute_import
 
@@ -196,7 +238,7 @@ def run(argv=None):
   if not known_args.read_only:
 write_to_datastore(gcloud_options.project, known_args, pipeline_options)
 
-  # Read from Datastore.
+  # Read entities from Datastore.
   result = read_from_datastore(gcloud_options.project, known_args,
pipeline_options)
 



[2/2] incubator-beam git commit: Closes #1540

2016-12-09 Thread robertwb
Closes #1540


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b265dcea
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b265dcea
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b265dcea

Branch: refs/heads/python-sdk
Commit: b265dceaac93a50543151efb4c3168a8275e8b2d
Parents: 8eae855 62b8095
Author: Robert Bradshaw 
Authored: Fri Dec 9 11:29:03 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 9 11:29:03 2016 -0800

--
 .../examples/cookbook/datastore_wordcount.py| 46 +++-
 1 file changed, 44 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Closes #1551

2016-12-09 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 44c1586f3 -> 8eae855d6


Closes #1551


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8eae855d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8eae855d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8eae855d

Branch: refs/heads/python-sdk
Commit: 8eae855d608abb140c0e3ece3927765575b81cbb
Parents: 44c1586 2dee686
Author: Robert Bradshaw 
Authored: Fri Dec 9 11:28:02 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 9 11:28:02 2016 -0800

--
 sdks/python/run_postcommit.sh | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)
--




[2/2] incubator-beam git commit: [BEAM-1109] Fix Python Postcommit Test Timeout

2016-12-09 Thread robertwb
[BEAM-1109] Fix Python Postcommit Test Timeout


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2dee6868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2dee6868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2dee6868

Branch: refs/heads/python-sdk
Commit: 2dee68683ad6d18161d22cd71f8d90768d7fcb30
Parents: 44c1586
Author: Mark Liu 
Authored: Thu Dec 8 00:06:06 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 9 11:28:02 2016 -0800

--
 sdks/python/run_postcommit.sh | 5 +
 1 file changed, 1 insertion(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2dee6868/sdks/python/run_postcommit.sh
--
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 4da078f..2e00a03 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -71,11 +71,8 @@ python setup.py sdist
 SDK_LOCATION=$(find dist/apache-beam-sdk-*.tar.gz)
 
 # Run ValidatesRunner tests on Google Cloud Dataflow service
-# processes   -> number of processes to run tests in parallel
-# process-timeout -> test timeout in seconds
 python setup.py nosetests \
-  -a ValidatesRunner --processes=4 --process-timeout=360 \
-  --test-pipeline-options=" \
+  -a ValidatesRunner --test-pipeline-options=" \
 --runner=BlockingDataflowPipelineRunner \
 --project=$PROJECT \
 --staging_location=$GCS_LOCATION/staging-validatesrunner-test \



[2/2] incubator-beam git commit: Fix a typo in query split error handling

2016-12-08 Thread robertwb
Fix a typo in query split error handling


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d6afb906
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d6afb906
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d6afb906

Branch: refs/heads/python-sdk
Commit: d6afb90690f5be2f3cb38d68dc1d49a1b551118e
Parents: 1392f70
Author: Vikas Kedigehalli 
Authored: Wed Dec 7 14:19:26 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 8 11:22:31 2016 -0800

--
 .../apache_beam/io/datastore/v1/datastoreio.py  |  2 +-
 .../io/datastore/v1/datastoreio_test.py | 29 
 2 files changed, 30 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
--
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py 
b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index fc3e813..a86bb0b 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -181,7 +181,7 @@ class ReadFromDatastore(PTransform):
   except Exception:
 logging.warning("Unable to parallelize the given query: %s", query,
 exc_info=True)
-query_splits = [(key, query)]
+query_splits = [query]
 
   sharded_query_splits = []
   for split_query in query_splits:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d6afb906/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
--
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py 
b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
index 2ac7ffb..f80a320 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio_test.py
@@ -122,6 +122,35 @@ class DatastoreioTest(unittest.TestCase):
   self.assertEqual(1, len(returned_split_queries))
   self.assertEqual(0, len(self._mock_datastore.method_calls))
 
+  def test_SplitQueryFn_with_exception(self):
+"""A test that verifies that no split is performed when failures occur."""
+with patch.object(helper, 'get_datastore',
+  return_value=self._mock_datastore):
+  # Force SplitQueryFn to compute the number of query splits
+  num_splits = 0
+  expected_num_splits = 1
+  entity_bytes = (expected_num_splits *
+  ReadFromDatastore._DEFAULT_BUNDLE_SIZE_BYTES)
+  with patch.object(ReadFromDatastore, 'get_estimated_size_bytes',
+return_value=entity_bytes):
+
+with patch.object(query_splitter, 'get_splits',
+  side_effect=ValueError("Testing query split error")):
+  split_query_fn = ReadFromDatastore.SplitQueryFn(
+  self._PROJECT, self._query, None, num_splits)
+  mock_context = MagicMock()
+  mock_context.element = self._query
+  split_query_fn.start_bundle(mock_context)
+  returned_split_queries = []
+  for split_query in split_query_fn.process(mock_context):
+returned_split_queries.append(split_query)
+
+  self.assertEqual(len(returned_split_queries), expected_num_splits)
+  self.assertEqual(returned_split_queries[0][1], self._query)
+  self.assertEqual(0,
+   len(self._mock_datastore.run_query.call_args_list))
+  self.verify_unique_keys(returned_split_queries)
+
   def test_DatastoreWriteFn_with_emtpy_batch(self):
 self.check_DatastoreWriteFn(0)
 



[1/2] incubator-beam git commit: Closes #1542

2016-12-08 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 1392f70b6 -> 44c1586f3


Closes #1542


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/44c1586f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/44c1586f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/44c1586f

Branch: refs/heads/python-sdk
Commit: 44c1586f3815f957a43037309f2a46a1766c6516
Parents: 1392f70 d6afb90
Author: Robert Bradshaw 
Authored: Thu Dec 8 11:22:31 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 8 11:22:31 2016 -0800

--
 .../apache_beam/io/datastore/v1/datastoreio.py  |  2 +-
 .../io/datastore/v1/datastoreio_test.py | 29 
 2 files changed, 30 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Handle empty batches in GcsIO batch methods

2016-12-08 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 75be6e974 -> 1392f70b6


Handle empty batches in GcsIO batch methods


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ef83b33
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ef83b33
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ef83b33

Branch: refs/heads/python-sdk
Commit: 3ef83b3396a4574b3283b29ba1f878b31badd612
Parents: 75be6e9
Author: Charles Chen 
Authored: Wed Dec 7 15:03:01 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 8 11:18:51 2016 -0800

--
 sdks/python/apache_beam/io/gcsio.py  | 4 
 sdks/python/apache_beam/io/gcsio_test.py | 4 
 2 files changed, 8 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ef83b33/sdks/python/apache_beam/io/gcsio.py
--
diff --git a/sdks/python/apache_beam/io/gcsio.py 
b/sdks/python/apache_beam/io/gcsio.py
index 748465f..f150c4c 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -204,6 +204,8 @@ class GcsIO(object):
  argument, where exception is None if the operation succeeded or
  the relevant exception if the operation failed.
 """
+if not paths:
+  return []
 batch_request = BatchApiRequest(
 retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
 for path in paths:
@@ -264,6 +266,8 @@ class GcsIO(object):
  src_dest_pairs argument, where exception is None if the operation
  succeeded or the relevant exception if the operation failed.
 """
+if not src_dest_pairs:
+  return []
 batch_request = BatchApiRequest(
 retryable_codes=retry.SERVER_ERROR_OR_TIMEOUT_CODES)
 for src, dest in src_dest_pairs:

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ef83b33/sdks/python/apache_beam/io/gcsio_test.py
--
diff --git a/sdks/python/apache_beam/io/gcsio_test.py 
b/sdks/python/apache_beam/io/gcsio_test.py
index 5af13c6..bd7eb51 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -265,6 +265,10 @@ class TestGCSIO(unittest.TestCase):
 with self.assertRaises(ValueError):
   self.gcs.open(file_name, 'r+b')
 
+  def test_empty_batches(self):
+self.assertEqual([], self.gcs.copy_batch([]))
+self.assertEqual([], self.gcs.delete_batch([]))
+
   def test_delete(self):
 file_name = 'gs://gcsio-test/delete_me'
 file_size = 1024



[2/2] incubator-beam git commit: Closes #1544

2016-12-08 Thread robertwb
Closes #1544


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1392f70b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1392f70b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1392f70b

Branch: refs/heads/python-sdk
Commit: 1392f70b6ee2db76e333fad16efe0cfdd00e7175
Parents: 75be6e9 3ef83b3
Author: Robert Bradshaw 
Authored: Thu Dec 8 11:18:52 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 8 11:18:52 2016 -0800

--
 sdks/python/apache_beam/io/gcsio.py  | 4 
 sdks/python/apache_beam/io/gcsio_test.py | 4 
 2 files changed, 8 insertions(+)
--




[1/2] incubator-beam git commit: Add reference to the >> and | operators for pipelines.

2016-12-08 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 43057960a -> 75be6e974


Add reference to the >> and | operators for pipelines.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3510ff99
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3510ff99
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3510ff99

Branch: refs/heads/python-sdk
Commit: 3510ff99da9cd149e67e8fdb12b942689374b2d7
Parents: 4305796
Author: Maria Garcia Herrero 
Authored: Tue Dec 6 21:05:47 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 8 11:14:51 2016 -0800

--
 sdks/python/README.md | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3510ff99/sdks/python/README.md
--
diff --git a/sdks/python/README.md b/sdks/python/README.md
index cff497c..820084d 100644
--- a/sdks/python/README.md
+++ b/sdks/python/README.md
@@ -163,8 +163,9 @@ The following examples demonstrate some basic, fundamental 
concepts for using Ap
 
 A basic pipeline will take as input an iterable, apply the
 beam.Create `PTransform`, and produce a `PCollection` that can
-be written to a file or modified by further `PTransform`s. The
-pipe operator allows to chain `PTransform`s.
+be written to a file or modified by further `PTransform`s.
+The `>>` operator is used to label `PTransform`s and
+the `|` operator is used to chain them.
 
 ```python
 # Standard imports



[2/2] incubator-beam git commit: Closes #1521

2016-12-08 Thread robertwb
Closes #1521


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/75be6e97
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/75be6e97
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/75be6e97

Branch: refs/heads/python-sdk
Commit: 75be6e974831f73ea935fe1f52fd7091a03c8928
Parents: 4305796 3510ff9
Author: Robert Bradshaw 
Authored: Thu Dec 8 11:14:52 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 8 11:14:52 2016 -0800

--
 sdks/python/README.md | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Fix template_runner_test on Windows

2016-12-07 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 4a660c604 -> 43057960a


Fix template_runner_test on Windows


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a565ca10
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a565ca10
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a565ca10

Branch: refs/heads/python-sdk
Commit: a565ca1008309564dba41d551c68ea553cd83a7b
Parents: 4a660c6
Author: Charles Chen 
Authored: Wed Dec 7 16:09:49 2016 -0800
Committer: Charles Chen 
Committed: Wed Dec 7 16:09:49 2016 -0800

--
 .../apache_beam/runners/template_runner_test.py   | 14 ++
 1 file changed, 10 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a565ca10/sdks/python/apache_beam/runners/template_runner_test.py
--
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py 
b/sdks/python/apache_beam/runners/template_runner_test.py
index a141521..cc3d7c2 100644
--- a/sdks/python/apache_beam/runners/template_runner_test.py
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -33,24 +33,30 @@ from apache_beam.internal import apiclient
 class TemplatingDataflowPipelineRunnerTest(unittest.TestCase):
   """TemplatingDataflow tests."""
   def test_full_completion(self):
-dummy_file = tempfile.NamedTemporaryFile()
+# Create dummy file and close it.  Note that we need to do this because
+# Windows does not allow NamedTemporaryFiles to be reopened elsewhere
+# before the temporary file is closed.
+dummy_file = tempfile.NamedTemporaryFile(delete=False)
+dummy_file_name = dummy_file.name
+dummy_file.close()
+
 dummy_dir = tempfile.mkdtemp()
 
 remote_runner = DataflowPipelineRunner()
 pipeline = Pipeline(remote_runner,
 options=PipelineOptions([
 '--dataflow_endpoint=ignored',
-'--sdk_location=' + dummy_file.name,
+'--sdk_location=' + dummy_file_name,
 '--job_name=test-job',
 '--project=test-project',
 '--staging_location=' + dummy_dir,
 '--temp_location=/dev/null',
-'--template_location=' + dummy_file.name,
+'--template_location=' + dummy_file_name,
 '--no_auth=True']))
 
 pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: 
disable=expression-not-assigned
 pipeline.run()
-with open(dummy_file.name) as template_file:
+with open(dummy_file_name) as template_file:
   saved_job_dict = json.load(template_file)
   self.assertEqual(
   saved_job_dict['environment']['sdkPipelineOptions']



[2/2] incubator-beam git commit: Closes #1548

2016-12-07 Thread robertwb
Closes #1548


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/43057960
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/43057960
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/43057960

Branch: refs/heads/python-sdk
Commit: 43057960af906ff5c435ea016d5f6df5bccaea40
Parents: 4a660c6 a565ca1
Author: Robert Bradshaw 
Authored: Wed Dec 7 18:17:39 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 7 18:17:39 2016 -0800

--
 .../apache_beam/runners/template_runner_test.py   | 14 ++
 1 file changed, 10 insertions(+), 4 deletions(-)
--




[1/2] incubator-beam git commit: Closes #1492

2016-12-07 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d5e8c79a3 -> 4a660c604


Closes #1492


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a660c60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a660c60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a660c60

Branch: refs/heads/python-sdk
Commit: 4a660c604a0b21f685d87b8ecc008aeb13bb4049
Parents: d5e8c79 c7626ad
Author: Robert Bradshaw 
Authored: Wed Dec 7 12:14:38 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Dec 7 12:14:38 2016 -0800

--
 sdks/python/run_postcommit.sh | 36 ++--
 1 file changed, 26 insertions(+), 10 deletions(-)
--




[2/2] incubator-beam git commit: Closes #1485

2016-12-06 Thread robertwb
Closes #1485


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f19f767b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f19f767b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f19f767b

Branch: refs/heads/python-sdk
Commit: f19f767b09275bdea325bb37a3767d96eeacd4a0
Parents: 6dcc429 aef4858
Author: Robert Bradshaw 
Authored: Tue Dec 6 10:14:11 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Dec 6 10:14:11 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py | 19 +--
 sdks/python/apache_beam/internal/pickler.py   |  8 
 2 files changed, 25 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Fix the pickle issue with the inconsistency of dill load and dump session

2016-12-06 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 6dcc429e5 -> f19f767b0


Fix the pickle issue with the inconsistency of dill load and dump session


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aef4858b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aef4858b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aef4858b

Branch: refs/heads/python-sdk
Commit: aef4858b80dfacf3401e6672b9373c82a8e77027
Parents: 6dcc429
Author: Sourabh Bajaj 
Authored: Fri Dec 2 15:02:18 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Dec 6 10:14:10 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py | 19 +--
 sdks/python/apache_beam/internal/pickler.py   |  8 
 2 files changed, 25 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index c5f5f70..f1341a7 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -46,7 +46,6 @@ from apache_beam.utils.options import WorkerOptions
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
-
 BIGQUERY_API_SERVICE = 'bigquery.googleapis.com'
 COMPUTE_API_SERVICE = 'compute.googleapis.com'
 STORAGE_API_SERVICE = 'storage.googleapis.com'
@@ -55,13 +54,19 @@ STORAGE_API_SERVICE = 'storage.googleapis.com'
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
 
-  def __init__(self, step_kind, step_name):
+  def __init__(self, step_kind, step_name, additional_properties=None):
 self.step_kind = step_kind
 self.step_name = step_name
 self.proto = dataflow.Step(kind=step_kind, name=step_name)
 self.proto.properties = {}
+self._additional_properties = []
+
+if additional_properties is not None:
+  for (n, v, t) in additional_properties:
+self.add_property(n, v, t)
 
   def add_property(self, name, value, with_type=False):
+self._additional_properties.append((name, value, with_type))
 self.proto.properties.additionalProperties.append(
 dataflow.Step.PropertiesValue.AdditionalProperty(
 key=name, value=to_json_value(value, with_type=with_type)))
@@ -77,6 +82,11 @@ class Step(object):
   outputs.append(entry_prop.value.string_value)
 return outputs
 
+  def __reduce__(self):
+"""Reduce hook for pickling the Step class more easily
+"""
+return (Step, (self.step_kind, self.step_name, 
self._additional_properties))
+
   def get_output(self, tag=None):
 """Returns name if it is one of the outputs or first output if name is 
None.
 
@@ -330,6 +340,11 @@ class Job(object):
   def json(self):
 return encoding.MessageToJson(self.proto)
 
+  def __reduce__(self):
+"""Reduce hook for pickling the Job class more easily
+"""
+return (Job, (self.options,))
+
 
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query 
jobs."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/aef4858b/sdks/python/apache_beam/internal/pickler.py
--
diff --git a/sdks/python/apache_beam/internal/pickler.py 
b/sdks/python/apache_beam/internal/pickler.py
index 30f0b77..d39a497 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -204,6 +204,14 @@ def loads(encoded):
 
 
 def dump_session(file_path):
+  """Pickle the current python session to be used in the worker.
+
+  Note: Due to the inconsistency in the first dump of dill dump_session we
+  create and load the dump twice to have consistent results in the worker and
+  the running session. Check: https://github.com/uqfoundation/dill/issues/195
+  """
+  dill.dump_session(file_path)
+  dill.load_session(file_path)
   return dill.dump_session(file_path)
 
 



[2/2] incubator-beam git commit: Closes #1512

2016-12-06 Thread robertwb
Closes #1512


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6dcc429e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6dcc429e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6dcc429e

Branch: refs/heads/python-sdk
Commit: 6dcc429e5bf4bb37605773be6de31efa3f887093
Parents: e73bdb5 4a02a68
Author: Robert Bradshaw 
Authored: Tue Dec 6 10:10:45 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Dec 6 10:10:45 2016 -0800

--
 .../apache_beam/runners/template_runner_test.py | 82 +++
 sdks/python/apache_beam/template_runner_test.py | 83 
 2 files changed, 82 insertions(+), 83 deletions(-)
--




[1/2] incubator-beam git commit: Move template_runners_test to runners folder.

2016-12-06 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e73bdb5c2 -> 6dcc429e5


Move template_runners_test to runners folder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a02a688
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a02a688
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a02a688

Branch: refs/heads/python-sdk
Commit: 4a02a688dbacd640e40d7b3d3ca268fde9806e73
Parents: e73bdb5
Author: Ahmet Altay 
Authored: Mon Dec 5 14:57:14 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Dec 6 10:10:44 2016 -0800

--
 .../apache_beam/runners/template_runner_test.py | 82 +++
 sdks/python/apache_beam/template_runner_test.py | 83 
 2 files changed, 82 insertions(+), 83 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a02a688/sdks/python/apache_beam/runners/template_runner_test.py
--
diff --git a/sdks/python/apache_beam/runners/template_runner_test.py 
b/sdks/python/apache_beam/runners/template_runner_test.py
new file mode 100644
index 000..a141521
--- /dev/null
+++ b/sdks/python/apache_beam/runners/template_runner_test.py
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for templated pipelines."""
+
+from __future__ import absolute_import
+
+import json
+import unittest
+import tempfile
+
+import apache_beam as beam
+from apache_beam.pipeline import Pipeline
+from apache_beam.runners.dataflow_runner import DataflowPipelineRunner
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.internal import apiclient
+
+
+class TemplatingDataflowPipelineRunnerTest(unittest.TestCase):
+  """TemplatingDataflow tests."""
+  def test_full_completion(self):
+dummy_file = tempfile.NamedTemporaryFile()
+dummy_dir = tempfile.mkdtemp()
+
+remote_runner = DataflowPipelineRunner()
+pipeline = Pipeline(remote_runner,
+options=PipelineOptions([
+'--dataflow_endpoint=ignored',
+'--sdk_location=' + dummy_file.name,
+'--job_name=test-job',
+'--project=test-project',
+'--staging_location=' + dummy_dir,
+'--temp_location=/dev/null',
+'--template_location=' + dummy_file.name,
+'--no_auth=True']))
+
+pipeline | beam.Create([1, 2, 3]) | beam.Map(lambda x: x) # pylint: 
disable=expression-not-assigned
+pipeline.run()
+with open(dummy_file.name) as template_file:
+  saved_job_dict = json.load(template_file)
+  self.assertEqual(
+  saved_job_dict['environment']['sdkPipelineOptions']
+  ['options']['project'], 'test-project')
+  self.assertEqual(
+  saved_job_dict['environment']['sdkPipelineOptions']
+  ['options']['job_name'], 'test-job')
+
+  def test_bad_path(self):
+dummy_sdk_file = tempfile.NamedTemporaryFile()
+remote_runner = DataflowPipelineRunner()
+pipeline = Pipeline(remote_runner,
+options=PipelineOptions([
+'--dataflow_endpoint=ignored',
+'--sdk_location=' + dummy_sdk_file.name,
+'--job_name=test-job',
+'--project=test-project',
+'--staging_location=ignored',
+'--temp_location=/dev/null',
+'--template_location=/bad/path',
+'--no_auth=True']))
+remote_runner.job = apiclient.Job(pipeline.options)
+
+with self.assertRaises(IOError):
+  pipeline.run()
+
+
+if __name__ == '__main__':
+  unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a02a688/sdks/python/apache_beam/template_runner_test.py

[1/2] incubator-beam git commit: Change export format to AVRO for BQ

2016-12-05 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d59bccd82 -> eb98d636e


Change export format to AVRO for BQ


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/390fbfdd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/390fbfdd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/390fbfdd

Branch: refs/heads/python-sdk
Commit: 390fbfddf444df410f7cb61329496f6f24a0532c
Parents: d59bccd
Author: Sourabh Bajaj 
Authored: Mon Dec 5 14:04:54 2016 -0800
Committer: Sourabh Bajaj 
Committed: Mon Dec 5 14:04:54 2016 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/390fbfdd/sdks/python/apache_beam/runners/dataflow_runner.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index 8b953b0..a3f7d94 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -520,7 +520,7 @@ class DataflowPipelineRunner(PipelineRunner):
 elif transform.source.format == 'text':
   step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
 elif transform.source.format == 'bigquery':
-  step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON')
+  step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_AVRO')
   # TODO(silviuc): Add table validation if transform.source.validate.
   if transform.source.table_reference is not None:
 step.add_property(PropertyNames.BIGQUERY_DATASET,



[2/2] incubator-beam git commit: Closes #1510

2016-12-05 Thread robertwb
Closes #1510


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/eb98d636
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/eb98d636
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/eb98d636

Branch: refs/heads/python-sdk
Commit: eb98d636ec9bbe11795aa9ee2fea1a8ceddaf794
Parents: d59bccd 390fbfd
Author: Robert Bradshaw 
Authored: Mon Dec 5 17:10:34 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Dec 5 17:10:34 2016 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[2/2] incubator-beam git commit: Closes #1509

2016-12-05 Thread robertwb
Closes #1509


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d59bccd8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d59bccd8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d59bccd8

Branch: refs/heads/python-sdk
Commit: d59bccd82461d340613a16ab41db2a4cc6e4200b
Parents: f7118c8 f1b83f7
Author: Robert Bradshaw 
Authored: Mon Dec 5 13:01:19 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Dec 5 13:01:19 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Add missing job parameter to the submit_job_description.

2016-12-05 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk f7118c8a5 -> d59bccd82


Add missing job parameter to the submit_job_description.

Tested post commit test locally.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1b83f7e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1b83f7e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1b83f7e

Branch: refs/heads/python-sdk
Commit: f1b83f7e82b56cd36bffbcdd5cc8ab319bf1e9d3
Parents: f7118c8
Author: Ahmet Altay 
Authored: Mon Dec 5 12:29:45 2016 -0800
Committer: Ahmet Altay 
Committed: Mon Dec 5 12:29:45 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1b83f7e/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index a894557..c5f5f70 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -412,7 +412,7 @@ class DataflowApplicationClient(object):
   self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
 
 if not template_location:
-  return self.submit_job_description()
+  return self.submit_job_description(job)
 else:
   return None
 
@@ -426,7 +426,7 @@ class DataflowApplicationClient(object):
 # TODO(silviuc): Remove the debug logging eventually.
 logging.info('JOB: %s', job)
 
-  def submit_job_description(self):
+  def submit_job_description(self, job):
 """Creates and excutes a job request."""
 request = dataflow.DataflowProjectsJobsCreateRequest()
 request.projectId = self.google_cloud_options.project



[1/2] incubator-beam git commit: Modify create_job to allow staging the job and not submitting it to the service.

2016-12-05 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 7c5e4aa66 -> 0d99856f3


Modify create_job to allow staging the job and not submitting it to the service.

- Modularize create_job in create job description, stage job, and send for 
execution.
- Modify --dataflow_job_file to stage the job and continue submitting it to the 
service.
- Add --template_location to stage the job but not submit it to the service.
- Add tests for both, including making them mutually exclusive (following Java 
SDK decision).
- Add template_runner_test.py with integration tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cfa0ad81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cfa0ad81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cfa0ad81

Branch: refs/heads/python-sdk
Commit: cfa0ad8136b323bade9de14ea6149e7f74cbd0b4
Parents: 7c5e4aa
Author: Maria Garcia Herrero 
Authored: Wed Nov 2 09:14:48 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Dec 5 11:04:34 2016 -0800

--
 sdks/python/apache_beam/examples/wordcount.py   |  1 -
 sdks/python/apache_beam/internal/apiclient.py   | 34 +++-
 .../apache_beam/runners/dataflow_runner.py  | 13 ++-
 sdks/python/apache_beam/template_runner_test.py | 83 
 sdks/python/apache_beam/utils/options.py| 10 +++
 .../apache_beam/utils/pipeline_options_test.py  | 13 +++
 .../utils/pipeline_options_validator_test.py| 28 +++
 7 files changed, 175 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/examples/wordcount.py
--
diff --git a/sdks/python/apache_beam/examples/wordcount.py 
b/sdks/python/apache_beam/examples/wordcount.py
index 096e508..7f347d8 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -59,7 +59,6 @@ class WordExtractingDoFn(beam.DoFn):
 
 def run(argv=None):
   """Main entry point; defines and runs the wordcount pipeline."""
-
   parser = argparse.ArgumentParser()
   parser.add_argument('--input',
   dest='input',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cfa0ad81/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index 5612631..a894557 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -24,6 +24,7 @@ import os
 import re
 import time
 
+from StringIO import StringIO
 
 from apitools.base.py import encoding
 from apitools.base.py import exceptions
@@ -42,7 +43,6 @@ from apache_beam.utils.options import DebugOptions
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import StandardOptions
 from apache_beam.utils.options import WorkerOptions
-
 from apache_beam.internal.clients import storage
 import apache_beam.internal.clients.dataflow as dataflow
 
@@ -327,6 +327,9 @@ class Job(object):
 self.base64_str_re = re.compile(r'^[A-Za-z0-9+/]*=*$')
 self.coder_str_re = re.compile(r'^([A-Za-z]+\$)([A-Za-z0-9+/]*=*)$')
 
+  def json(self):
+return encoding.MessageToJson(self.proto)
+
 
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query 
jobs."""
@@ -392,8 +395,29 @@ class DataflowApplicationClient(object):
   # TODO(silviuc): Refactor so that retry logic can be applied.
   @retry.no_retries  # Using no_retries marks this as an integration point.
   def create_job(self, job):
-"""Submits for remote execution a job described by the workflow proto."""
-# Stage job resources and add an environment proto with their paths.
+"""Creates a job description.
+Additionally, it may stage it and/or submit it for remote execution.
+"""
+self.create_job_description(job)
+
+# Stage and submit the job when necessary
+dataflow_job_file = job.options.view_as(DebugOptions).dataflow_job_file
+template_location = (
+job.options.view_as(GoogleCloudOptions).template_location)
+
+job_location = template_location or dataflow_job_file
+if job_location:
+  gcs_or_local_path = os.path.dirname(job_location)
+  file_name = os.path.basename(job_location)
+  self.stage_file(gcs_or_local_path, file_name, StringIO(job.json()))
+
+if not template_location:
+  return self.submit_job_description()
+else:
+  return None
+
+  def create_job_description(self, job):
+"""Creates a job described by the 

[2/2] incubator-beam git commit: Closes #1342

2016-12-05 Thread robertwb
Closes #1342


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0d99856f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0d99856f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0d99856f

Branch: refs/heads/python-sdk
Commit: 0d99856f37d6bca9bb8d676ae36157bd0515a4f2
Parents: 7c5e4aa cfa0ad8
Author: Robert Bradshaw 
Authored: Mon Dec 5 11:04:35 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Dec 5 11:04:35 2016 -0800

--
 sdks/python/apache_beam/examples/wordcount.py   |  1 -
 sdks/python/apache_beam/internal/apiclient.py   | 34 +++-
 .../apache_beam/runners/dataflow_runner.py  | 13 ++-
 sdks/python/apache_beam/template_runner_test.py | 83 
 sdks/python/apache_beam/utils/options.py| 10 +++
 .../apache_beam/utils/pipeline_options_test.py  | 13 +++
 .../utils/pipeline_options_validator_test.py| 28 +++
 7 files changed, 175 insertions(+), 7 deletions(-)
--




[2/2] incubator-beam git commit: Removing a bug in .travis.yml that makes the build fail.

2016-12-02 Thread robertwb
Removing a bug in .travis.yml that makes the build fail.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/90db7908
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/90db7908
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/90db7908

Branch: refs/heads/python-sdk
Commit: 90db7908b807cb752c23c445b220b3d5dd08b36b
Parents: 9a175a5
Author: Pablo 
Authored: Tue Nov 29 13:58:55 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:15:41 2016 -0800

--
 .travis.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/90db7908/.travis.yml
--
diff --git a/.travis.yml b/.travis.yml
index 3080341..470d2fc 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -59,7 +59,7 @@ before_install:
 install:
   - if [ ! "$TEST_PYTHON" ]; then travis_retry mvn -B install clean -U 
-DskipTests=true; fi
   - if [ "$TEST_PYTHON" ] && pip list | grep tox; then TOX_FILE=`which tox` ; 
export TOX_HOME=`dirname $TOX_FILE`; fi
-  - if [ "$TEST_PYTHON" ] && ! pip list | grep tox; then travis_retry pip 
install tox --user `whoami`; fi
+  - if [ "$TEST_PYTHON" ] && ! pip list | grep tox; then travis_retry pip 
install tox --user; fi
   # Removing this here protects from inadvertent caching
   - rm -rf "$HOME/.m2/repository/org/apache/beam"
 



[1/2] incubator-beam git commit: Closes #1456

2016-12-02 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 9a175a5fe -> 7c5e4aa66


Closes #1456


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c5e4aa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c5e4aa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c5e4aa6

Branch: refs/heads/python-sdk
Commit: 7c5e4aa66c3916b98cf7ecf932f11c2b057e1858
Parents: 9a175a5 90db790
Author: Robert Bradshaw 
Authored: Fri Dec 2 22:15:41 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:15:41 2016 -0800

--
 .travis.yml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Call from_p12_keyfile() with the correct arguments.

2016-12-02 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 7c0bf25fa -> 9a175a5fe


Call from_p12_keyfile() with the correct arguments.

This code path is failing, because a wrong list of arguments is passed.
Fixing that uncovered that oauth2client depends on pyOpenSSL for this
call to work. I did not add this dependency to setup.py because, it does
not install cleanly in all environments.

As a workaround, users who would like to use this authentication method
could first do a 'pip install pyOpenSSL'. I added a test, that skips if 
'pyOpenSSL' is not installed.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f23b717e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f23b717e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f23b717e

Branch: refs/heads/python-sdk
Commit: f23b717e7e433c30c0acee0fea8d179e6343b8b8
Parents: 7c0bf25
Author: Ahmet Altay 
Authored: Fri Dec 2 13:38:31 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:14:32 2016 -0800

--
 sdks/python/apache_beam/internal/auth.py|   6 +--
 sdks/python/apache_beam/internal/auth_test.py   |  44 +++
 sdks/python/apache_beam/tests/data/README.md|  20 +
 .../apache_beam/tests/data/privatekey.p12   | Bin 0 -> 2452 bytes
 sdks/python/setup.py|   2 +-
 5 files changed, 68 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f23b717e/sdks/python/apache_beam/internal/auth.py
--
diff --git a/sdks/python/apache_beam/internal/auth.py 
b/sdks/python/apache_beam/internal/auth.py
index a043fcf..056f40c 100644
--- a/sdks/python/apache_beam/internal/auth.py
+++ b/sdks/python/apache_beam/internal/auth.py
@@ -133,6 +133,7 @@ def get_service_credentials():
 'https://www.googleapis.com/auth/datastore'
 ]
 
+# TODO(BEAM-1068): Do not recreate options from sys.argv.
 # We are currently being run from the command line.
 google_cloud_options = PipelineOptions(
 sys.argv).view_as(GoogleCloudOptions)
@@ -151,8 +152,8 @@ def get_service_credentials():
 return ServiceAccountCredentials.from_p12_keyfile(
 google_cloud_options.service_account_name,
 google_cloud_options.service_account_key_file,
-client_scopes,
-user_agent=user_agent)
+private_key_password=None,
+scopes=client_scopes)
   except ImportError:
 with file(google_cloud_options.service_account_key_file) as f:
   service_account_key = f.read()
@@ -162,7 +163,6 @@ def get_service_credentials():
 service_account_key,
 client_scopes,
 user_agent=user_agent)
-
 else:
   try:
 credentials = _GCloudWrapperCredentials(user_agent)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f23b717e/sdks/python/apache_beam/internal/auth_test.py
--
diff --git a/sdks/python/apache_beam/internal/auth_test.py 
b/sdks/python/apache_beam/internal/auth_test.py
new file mode 100644
index 000..dfd408e
--- /dev/null
+++ b/sdks/python/apache_beam/internal/auth_test.py
@@ -0,0 +1,44 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""Unit tests for the auth module."""
+
+import os
+import sys
+import unittest
+
+import mock
+
+from apache_beam.internal import auth
+
+
+class AuthTest(unittest.TestCase):
+
+  def test_create_application_client(self):
+try:
+  test_args = [
+  'test', '--service_account_name', 'abc', 
'--service_account_key_file',
+  os.path.join(
+  os.path.dirname(__file__), '..', 'tests/data/privatekey.p12')]
+  with mock.patch.object(sys, 'argv', test_args):
+credentials = auth.get_service_credentials()
+self.assertIsNotNone(credentials)
+except NotImplementedError:
+  

[2/2] incubator-beam git commit: Closes #1491

2016-12-02 Thread robertwb
Closes #1491


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9a175a5f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9a175a5f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9a175a5f

Branch: refs/heads/python-sdk
Commit: 9a175a5fe17d087f2c3ca0ff8e6d53faad6beab4
Parents: 7c0bf25 f23b717
Author: Robert Bradshaw 
Authored: Fri Dec 2 22:14:33 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:14:33 2016 -0800

--
 sdks/python/apache_beam/internal/auth.py|   6 +--
 sdks/python/apache_beam/internal/auth_test.py   |  44 +++
 sdks/python/apache_beam/tests/data/README.md|  20 +
 .../apache_beam/tests/data/privatekey.p12   | Bin 0 -> 2452 bytes
 sdks/python/setup.py|   2 +-
 5 files changed, 68 insertions(+), 4 deletions(-)
--




[1/2] incubator-beam git commit: Add labels to lambdas in write finalization

2016-12-02 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 72fa21f98 -> 7c0bf25fa


Add labels to lambdas in write finalization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/51e97d4a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/51e97d4a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/51e97d4a

Branch: refs/heads/python-sdk
Commit: 51e97d4a8d3f25608b6ee80f57c973186798d54f
Parents: 72fa21f
Author: Charles Chen 
Authored: Fri Dec 2 15:17:55 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:12:35 2016 -0800

--
 sdks/python/apache_beam/io/iobase.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/51e97d4a/sdks/python/apache_beam/io/iobase.py
--
diff --git a/sdks/python/apache_beam/io/iobase.py 
b/sdks/python/apache_beam/io/iobase.py
index b7cac3e..fd6ae57 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -767,10 +767,10 @@ class WriteImpl(ptransform.PTransform):
   write_result_coll = (pcoll | core.ParDo('write_bundles',
   _WriteBundleDoFn(), self.sink,
   AsSingleton(init_result_coll))
-   | core.Map(lambda x: (None, x))
+   | core.Map('pair', lambda x: (None, x))
| core.WindowInto(window.GlobalWindows())
| core.GroupByKey()
-   | core.FlatMap(lambda x: x[1]))
+   | core.FlatMap('extract', lambda x: x[1]))
 return do_once | core.FlatMap(
 'finalize_write',
 _finalize_write,



[2/2] incubator-beam git commit: Closes #1496

2016-12-02 Thread robertwb
Closes #1496


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c0bf25f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c0bf25f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c0bf25f

Branch: refs/heads/python-sdk
Commit: 7c0bf25fadbe2e74ab62c87c90111b8b7c34e297
Parents: 72fa21f 51e97d4
Author: Robert Bradshaw 
Authored: Fri Dec 2 22:12:36 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:12:36 2016 -0800

--
 sdks/python/apache_beam/io/iobase.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Make the legacy SQL flag consistent between Java and Python

2016-12-02 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 8365b6838 -> 72fa21f98


Make the legacy SQL flag consistent between Java and Python

Renamed the BigQuery use_legacy_sql parameter to use_standard_sql.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72721031
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72721031
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72721031

Branch: refs/heads/python-sdk
Commit: 727210318404a585bb7742591ade0a09ccc20444
Parents: 8365b68
Author: Sourabh Bajaj 
Authored: Fri Dec 2 16:45:19 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:10:21 2016 -0800

--
 sdks/python/apache_beam/examples/snippets/snippets.py | 2 +-
 sdks/python/apache_beam/io/bigquery.py| 9 +
 sdks/python/apache_beam/io/bigquery_test.py   | 4 ++--
 3 files changed, 8 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/examples/snippets/snippets.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 580a3d7..6dcf05e 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -962,7 +962,7 @@ def model_bigqueryio():
   'ReadYearAndTemp',
   beam.io.BigQuerySource(
   query='SELECT year, mean_temp FROM `samples.weather_stations`',
-  use_legacy_sql=False))
+  use_standard_sql=True))
   # [END model_bigqueryio_query_standard_sql]
 
   # [START model_bigqueryio_schema]

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/io/bigquery.py
--
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index 0885e3a..ce75e10 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -323,7 +323,7 @@ class BigQuerySource(dataflow_io.NativeSource):
   """A source based on a BigQuery table."""
 
   def __init__(self, table=None, dataset=None, project=None, query=None,
-   validate=False, coder=None, use_legacy_sql=True):
+   validate=False, coder=None, use_standard_sql=False):
 """Initialize a BigQuerySource.
 
 Args:
@@ -351,8 +351,8 @@ class BigQuerySource(dataflow_io.NativeSource):
 in a file as a JSON serialized dictionary. This argument needs a value
 only in special cases when returning table rows as dictionaries is not
 desirable.
-  useLegacySql: Specifies whether to use BigQuery's legacy
-SQL dialect for this query. The default value is true. If set to false,
+  use_standard_sql: Specifies whether to use BigQuery's standard
+SQL dialect for this query. The default value is False. If set to True,
 the query will use BigQuery's updated SQL dialect with improved
 standards compliance. This parameter is ignored for table inputs.
 
@@ -374,7 +374,8 @@ class BigQuerySource(dataflow_io.NativeSource):
   self.use_legacy_sql = True
 else:
   self.query = query
-  self.use_legacy_sql = use_legacy_sql
+  # TODO(BEAM-1082): Change the internal flag to be standard_sql
+  self.use_legacy_sql = not use_standard_sql
   self.table_reference = None
 
 self.validate = validate

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/72721031/sdks/python/apache_beam/io/bigquery_test.py
--
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index e263e13..a2cf947 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -199,7 +199,7 @@ class TestBigQuerySource(unittest.TestCase):
 hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_specify_query_sql_format(self):
-source = beam.io.BigQuerySource(query='my_query', use_legacy_sql=False)
+source = beam.io.BigQuerySource(query='my_query', use_standard_sql=True)
 self.assertEqual(source.query, 'my_query')
 self.assertFalse(source.use_legacy_sql)
 
@@ -371,7 +371,7 @@ class TestBigQueryReader(unittest.TestCase):
 jobComplete=True, rows=table_rows, schema=schema)
 actual_rows = []
 with beam.io.BigQuerySource(
-query='query', use_legacy_sql=False).reader(client) as reader:
+query='query', use_standard_sql=True).reader(client) as reader:
   for 

[2/2] incubator-beam git commit: Closes #1497

2016-12-02 Thread robertwb
Closes #1497


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/72fa21f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/72fa21f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/72fa21f9

Branch: refs/heads/python-sdk
Commit: 72fa21f98527e57cd5c7fad3977c95d7c994325e
Parents: 8365b68 7272103
Author: Robert Bradshaw 
Authored: Fri Dec 2 22:10:22 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:10:22 2016 -0800

--
 sdks/python/apache_beam/examples/snippets/snippets.py | 2 +-
 sdks/python/apache_beam/io/bigquery.py| 9 +
 sdks/python/apache_beam/io/bigquery_test.py   | 4 ++--
 3 files changed, 8 insertions(+), 7 deletions(-)
--




[2/2] incubator-beam git commit: Closes #1494

2016-12-02 Thread robertwb
Closes #1494


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8365b683
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8365b683
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8365b683

Branch: refs/heads/python-sdk
Commit: 8365b6838eda6dcef39097ab19b85b9af270914f
Parents: fd6a52c 16ffdb2
Author: Robert Bradshaw 
Authored: Fri Dec 2 22:06:42 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:06:42 2016 -0800

--
 sdks/python/apache_beam/examples/snippets/snippets.py  |  2 +-
 sdks/python/apache_beam/internal/apiclient_test.py |  1 +
 sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 10 +++---
 sdks/python/apache_beam/io/datastore/v1/helper.py  |  4 +++-
 4 files changed, 8 insertions(+), 9 deletions(-)
--




[1/2] incubator-beam git commit: Fix auth related unit test failures

2016-12-02 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk fd6a52c15 -> 8365b6838


Fix auth related unit test failures


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/16ffdb25
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/16ffdb25
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/16ffdb25

Branch: refs/heads/python-sdk
Commit: 16ffdb25f6029c4bee71f035d8d9747f6330ec9f
Parents: fd6a52c
Author: Vikas Kedigehalli 
Authored: Fri Dec 2 14:13:31 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 22:06:41 2016 -0800

--
 sdks/python/apache_beam/examples/snippets/snippets.py  |  2 +-
 sdks/python/apache_beam/internal/apiclient_test.py |  1 +
 sdks/python/apache_beam/io/datastore/v1/datastoreio.py | 10 +++---
 sdks/python/apache_beam/io/datastore/v1/helper.py  |  4 +++-
 4 files changed, 8 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/examples/snippets/snippets.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index c2a047f..580a3d7 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -919,7 +919,7 @@ def model_datastoreio():
   # [START model_datastoreio_write]
   p = beam.Pipeline(options=PipelineOptions())
   musicians = p | 'Musicians' >> beam.Create(
-  ['Mozart', 'Chopin', 'Beethoven', 'Bach'])
+  ['Mozart', 'Chopin', 'Beethoven', 'Vivaldi'])
 
   def to_entity(content):
 entity = entity_pb2.Entity()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/internal/apiclient_test.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient_test.py 
b/sdks/python/apache_beam/internal/apiclient_test.py
index 66cc8db..31b2dad 100644
--- a/sdks/python/apache_beam/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/internal/apiclient_test.py
@@ -25,6 +25,7 @@ from apache_beam.internal import apiclient
 
 class UtilTest(unittest.TestCase):
 
+  @unittest.skip("Enable once BEAM-1080 is fixed.")
   def test_create_application_client(self):
 pipeline_options = PipelineOptions()
 apiclient.DataflowApplicationClient(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
--
diff --git a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py 
b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
index 054002f..20466b9 100644
--- a/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
+++ b/sdks/python/apache_beam/io/datastore/v1/datastoreio.py
@@ -22,7 +22,6 @@ import logging
 from google.datastore.v1 import datastore_pb2
 from googledatastore import helper as datastore_helper
 
-from apache_beam.internal import auth
 from apache_beam.io.datastore.v1 import helper
 from apache_beam.io.datastore.v1 import query_splitter
 from apache_beam.transforms import Create
@@ -154,8 +153,7 @@ class ReadFromDatastore(PTransform):
   self._num_splits = num_splits
 
 def start_bundle(self, context):
-  self._datastore = helper.get_datastore(self._project,
- auth.get_service_credentials())
+  self._datastore = helper.get_datastore(self._project)
 
 def process(self, p_context, *args, **kwargs):
   # distinct key to be used to group query splits.
@@ -210,8 +208,7 @@ class ReadFromDatastore(PTransform):
   self._datastore = None
 
 def start_bundle(self, context):
-  self._datastore = helper.get_datastore(self._project,
- auth.get_service_credentials())
+  self._datastore = helper.get_datastore(self._project)
 
 def process(self, p_context, *args, **kwargs):
   query = p_context.element
@@ -341,8 +338,7 @@ class _Mutate(PTransform):
 
 def start_bundle(self, context):
   self._mutations = []
-  self._datastore = helper.get_datastore(self._project,
- auth.get_service_credentials())
+  self._datastore = helper.get_datastore(self._project)
 
 def process(self, context):
   self._mutations.append(context.element)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16ffdb25/sdks/python/apache_beam/io/datastore/v1/helper.py
--
diff --git 

[2/2] incubator-beam git commit: Closes #1481

2016-12-02 Thread robertwb
Closes #1481


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2363ee51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2363ee51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2363ee51

Branch: refs/heads/python-sdk
Commit: 2363ee510694dab20d925f7d4979fc6dcd495477
Parents: a463f00 557a2f9
Author: Robert Bradshaw 
Authored: Fri Dec 2 11:34:29 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 11:34:29 2016 -0800

--
 .../apache_beam/examples/snippets/snippets.py   | 41 
 .../examples/snippets/snippets_test.py  |  9 -
 2 files changed, 49 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Add snippet for datastoreio

2016-12-02 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk a463f000e -> 2363ee510


Add snippet for datastoreio


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/557a2f92
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/557a2f92
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/557a2f92

Branch: refs/heads/python-sdk
Commit: 557a2f92f67bfc545533fa35852485e9c4c0b785
Parents: a463f00
Author: Vikas Kedigehalli 
Authored: Thu Dec 1 10:27:05 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Dec 2 11:34:28 2016 -0800

--
 .../apache_beam/examples/snippets/snippets.py   | 41 
 .../examples/snippets/snippets_test.py  |  9 -
 2 files changed, 49 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 8d05130..c2a047f 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -891,6 +891,47 @@ def model_textio(renames):
   p.run()
 
 
+def model_datastoreio():
+  """Using a Read and Write transform to read/write to Cloud Datastore.
+
+  URL: https://cloud.google.com/dataflow/model/datastoreio
+  """
+
+  import uuid
+  from google.datastore.v1 import entity_pb2
+  from google.datastore.v1 import query_pb2
+  import googledatastore
+  import apache_beam as beam
+  from apache_beam.utils.options import PipelineOptions
+  from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+  from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
+
+  project = 'my_project'
+  kind = 'my_kind'
+  query = query_pb2.Query()
+  query.kind.add().name = kind
+
+  # [START model_datastoreio_read]
+  p = beam.Pipeline(options=PipelineOptions())
+  entities = p | 'Read From Datastore' >> ReadFromDatastore(project, query)
+  # [END model_datastoreio_read]
+
+  # [START model_datastoreio_write]
+  p = beam.Pipeline(options=PipelineOptions())
+  musicians = p | 'Musicians' >> beam.Create(
+  ['Mozart', 'Chopin', 'Beethoven', 'Bach'])
+
+  def to_entity(content):
+entity = entity_pb2.Entity()
+googledatastore.helper.add_key_path(entity.key, kind, str(uuid.uuid4()))
+googledatastore.helper.add_properties(entity, {'content': 
unicode(content)})
+return entity
+
+  entities = musicians | 'To Entity' >> beam.Map(to_entity)
+  entities | 'Write To Datastore' >> WriteToDatastore(project)
+  # [END model_datastoreio_write]
+
+
 def model_bigqueryio():
   """Using a Read and Write transform to read/write to BigQuery.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/557a2f92/sdks/python/apache_beam/examples/snippets/snippets_test.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 72fccb2..09b4ba4 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -470,11 +470,18 @@ class SnippetsTest(unittest.TestCase):
 ['aa', 'bb', 'bb', 'cc', 'cc', 'cc'],
 self.get_output(result_path, suffix='.csv'))
 
+  def test_model_datastoreio(self):
+# We cannot test datastoreio functionality in unit tests therefore we limit
+# ourselves to making sure the pipeline containing Datastore read and write
+# transforms can be built.
+# TODO(vikasrk): Expore using Datastore Emulator.
+snippets.model_datastoreio()
+
   def test_model_bigqueryio(self):
 # We cannot test BigQueryIO functionality in unit tests therefore we limit
 # ourselves to making sure the pipeline containing BigQuery sources and
 # sinks can be built.
-self.assertEqual(None, snippets.model_bigqueryio())
+snippets.model_bigqueryio()
 
   def _run_test_pipeline_for_options(self, fn):
 temp_path = self.create_temp_file('aa\nbb\ncc')



[2/2] incubator-beam git commit: Closes #1472

2016-12-01 Thread robertwb
Closes #1472


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9ded359d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9ded359d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9ded359d

Branch: refs/heads/python-sdk
Commit: 9ded359daefc6040d61a1f33c77563474fcb09b6
Parents: 4414e20 dc68365
Author: Robert Bradshaw 
Authored: Thu Dec 1 09:40:32 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 1 09:40:32 2016 -0800

--
 sdks/python/apache_beam/examples/snippets/snippets.py | 9 +
 1 file changed, 9 insertions(+)
--




[1/2] incubator-beam git commit: Parse table schema from JSON

2016-12-01 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk aa9071d56 -> 739a43197


Parse table schema from JSON


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4c2f62b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4c2f62b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4c2f62b

Branch: refs/heads/python-sdk
Commit: b4c2f62be8a809b666089e7b2fe5dada9cbd6c16
Parents: aa9071d
Author: Sourabh Bajaj 
Authored: Wed Nov 30 13:48:28 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 1 09:07:27 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  | 37 
 sdks/python/apache_beam/io/bigquery_test.py | 22 ++
 2 files changed, 59 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery.py
--
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index 8d7892a..0885e3a 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -200,6 +200,43 @@ class TableRowJsonCoder(coders.Coder):
 f=[bigquery.TableCell(v=to_json_value(e)) for e in od.itervalues()])
 
 
+def parse_table_schema_from_json(schema_string):
+  """Parse the Table Schema provided as string.
+
+  Args:
+schema_string: String serialized table schema, should be a valid JSON.
+
+  Returns:
+A TableSchema of the BigQuery export from either the Query or the Table.
+  """
+  json_schema = json.loads(schema_string)
+
+  def _parse_schema_field(field):
+"""Parse a single schema field from dictionary.
+
+Args:
+  field: Dictionary object containing serialized schema.
+
+Returns:
+  A TableFieldSchema for a single column in BigQuery.
+"""
+schema = bigquery.TableFieldSchema()
+schema.name = field['name']
+schema.type = field['type']
+if 'mode' in field:
+  schema.mode = field['mode']
+else:
+  schema.mode = 'NULLABLE'
+if 'description' in field:
+  schema.description = field['description']
+if 'fields' in field:
+  schema.fields = [_parse_schema_field(x) for x in field['fields']]
+return schema
+
+  fields = [_parse_schema_field(f) for f in json_schema['fields']]
+  return bigquery.TableSchema(fields=fields)
+
+
 class BigQueryDisposition(object):
   """Class holding standard strings used for create and write dispositions."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b4c2f62b/sdks/python/apache_beam/io/bigquery_test.py
--
diff --git a/sdks/python/apache_beam/io/bigquery_test.py 
b/sdks/python/apache_beam/io/bigquery_test.py
index b0c3bbe..e263e13 100644
--- a/sdks/python/apache_beam/io/bigquery_test.py
+++ b/sdks/python/apache_beam/io/bigquery_test.py
@@ -32,6 +32,7 @@ from apache_beam.internal.clients import bigquery
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.io.bigquery import RowAsDictJsonCoder
 from apache_beam.io.bigquery import TableRowJsonCoder
+from apache_beam.io.bigquery import parse_table_schema_from_json
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils.options import PipelineOptions
@@ -113,6 +114,27 @@ class TestTableRowJsonCoder(unittest.TestCase):
 self.json_compliance_exception(float('-inf'))
 
 
+class TestTableSchemaParser(unittest.TestCase):
+  def test_parse_table_schema_from_json(self):
+string_field = bigquery.TableFieldSchema(
+name='s', type='STRING', mode='NULLABLE', description='s description')
+number_field = bigquery.TableFieldSchema(
+name='n', type='INTEGER', mode='REQUIRED', description='n description')
+record_field = bigquery.TableFieldSchema(
+name='r', type='RECORD', mode='REQUIRED', description='r description',
+fields=[string_field, number_field])
+expected_schema = bigquery.TableSchema(fields=[record_field])
+json_str = json.dumps({'fields': [
+{'name': 'r', 'type': 'RECORD', 'mode': 'REQUIRED',
+ 'description': 'r description', 'fields': [
+ {'name': 's', 'type': 'STRING', 'mode': 'NULLABLE',
+  'description': 's description'},
+ {'name': 'n', 'type': 'INTEGER', 'mode': 'REQUIRED',
+  'description': 'n description'}]}]})
+self.assertEqual(parse_table_schema_from_json(json_str),
+ expected_schema)
+
+
 class TestBigQuerySource(unittest.TestCase):
 
   def test_display_data_item_on_validate_true(self):



[2/2] incubator-beam git commit: Closes #1468

2016-12-01 Thread robertwb
Closes #1468


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/739a4319
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/739a4319
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/739a4319

Branch: refs/heads/python-sdk
Commit: 739a431975120fe267e6b81635ce6e2356bd2895
Parents: aa9071d b4c2f62
Author: Robert Bradshaw 
Authored: Thu Dec 1 09:07:28 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Dec 1 09:07:28 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  | 37 
 sdks/python/apache_beam/io/bigquery_test.py | 22 ++
 2 files changed, 59 insertions(+)
--




[2/2] incubator-beam git commit: Closes #1454

2016-11-30 Thread robertwb
Closes #1454


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/aa9071d5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/aa9071d5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/aa9071d5

Branch: refs/heads/python-sdk
Commit: aa9071d56120a1c0f91cc580fb956570113ee104
Parents: 70c1de9 495a2d8
Author: Robert Bradshaw 
Authored: Wed Nov 30 14:08:58 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Nov 30 14:08:58 2016 -0800

--
 .../examples/cookbook/datastore_wordcount.py| 210 +++
 .../apache_beam/examples/datastore_wordcount.py | 203 --
 sdks/python/setup.py|   2 +-
 3 files changed, 211 insertions(+), 204 deletions(-)
--




[1/2] incubator-beam git commit: Few datastoreio fixes

2016-11-30 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 70c1de9b9 -> aa9071d56


Few datastoreio fixes

  - pin googledatastore version to 6.4.1
  - add num_shards options to datastore wordcount example
  - move datastore wordcount example to cookbook directory


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/495a2d8c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/495a2d8c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/495a2d8c

Branch: refs/heads/python-sdk
Commit: 495a2d8c01949e4248d7e6dc9ad7a04168a292d1
Parents: 70c1de9
Author: Vikas Kedigehalli 
Authored: Tue Nov 29 10:02:14 2016 -0800
Committer: Vikas Kedigehalli 
Committed: Wed Nov 30 12:18:53 2016 -0800

--
 .../examples/cookbook/datastore_wordcount.py| 210 +++
 .../apache_beam/examples/datastore_wordcount.py | 203 --
 sdks/python/setup.py|   2 +-
 3 files changed, 211 insertions(+), 204 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/495a2d8c/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py 
b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
new file mode 100644
index 000..eb62614
--- /dev/null
+++ b/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py
@@ -0,0 +1,210 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""A word-counting workflow that uses Google Cloud Datastore."""
+
+from __future__ import absolute_import
+
+import argparse
+import logging
+import re
+import uuid
+
+from google.datastore.v1 import entity_pb2
+from google.datastore.v1 import query_pb2
+from googledatastore import helper as datastore_helper, PropertyFilter
+
+import apache_beam as beam
+from apache_beam.io.datastore.v1.datastoreio import ReadFromDatastore
+from apache_beam.io.datastore.v1.datastoreio import WriteToDatastore
+from apache_beam.utils.options import GoogleCloudOptions
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
+
+empty_line_aggregator = beam.Aggregator('emptyLines')
+average_word_size_aggregator = beam.Aggregator('averageWordLength',
+   beam.combiners.MeanCombineFn(),
+   float)
+
+
+class WordExtractingDoFn(beam.DoFn):
+  """Parse each line of input text into words."""
+
+  def process(self, context):
+"""Returns an iterator over words in contents of Cloud Datastore entity.
+The element is a line of text.  If the line is blank, note that, too.
+Args:
+  context: the call-specific context: data and aggregator.
+Returns:
+  The processed element.
+"""
+content_value = context.element.properties.get('content', None)
+text_line = ''
+if content_value:
+  text_line = content_value.string_value
+
+if not text_line:
+  context.aggregate_to(empty_line_aggregator, 1)
+words = re.findall(r'[A-Za-z\']+', text_line)
+for w in words:
+  context.aggregate_to(average_word_size_aggregator, len(w))
+return words
+
+
+class EntityWrapper(object):
+  """Create a Cloud Datastore entity from the given string."""
+  def __init__(self, namespace, kind, ancestor):
+self._namespace = namespace
+self._kind = kind
+self._ancestor = ancestor
+
+  def make_entity(self, content):
+entity = entity_pb2.Entity()
+if self._namespace is not None:
+  entity.key.partition_id.namespace_id = self._namespace
+
+# All entities created will have the same ancestor
+datastore_helper.add_key_path(entity.key, self._kind, self._ancestor,
+  self._kind, str(uuid.uuid4()))
+
+datastore_helper.add_properties(entity, {"content": unicode(content)})
+return entity
+
+
+def 

[1/2] incubator-beam git commit: Improve GcsIO throughput by 10x

2016-11-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk cce4331dc -> c1440f7aa


Improve GcsIO throughput by 10x

This change increases the read buffer used from 1M to 16M.  Previously,
the speed of reading an incompressible file were: (50 MB: 3.17 MB/s,
100 MB: 3.79 MB/s, 200 MB: 4.13 MB/s, 400 MB: 4.24 MB/s).

The speed is now improved to: (50 MB: 24.21 MB/s, 100 MB: 42.70 MB/s,
200 MB: 42.89 MB/s, 400 MB: 46.92 MB/s).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e4a332d9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e4a332d9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e4a332d9

Branch: refs/heads/python-sdk
Commit: e4a332d9de5eca941e08f23242cd63bb83148312
Parents: cce4331
Author: Charles Chen 
Authored: Thu Nov 17 11:46:44 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 21:53:26 2016 -0800

--
 sdks/python/apache_beam/io/gcsio.py | 20 ++--
 1 file changed, 18 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e4a332d9/sdks/python/apache_beam/io/gcsio.py
--
diff --git a/sdks/python/apache_beam/io/gcsio.py 
b/sdks/python/apache_beam/io/gcsio.py
index 1b08994..4f310be 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -47,7 +47,23 @@ except ImportError:
   'Google Cloud Storage I/O not supported for this execution environment '
   '(could not import storage API client).')
 
-DEFAULT_READ_BUFFER_SIZE = 1024 * 1024
+# This is the size of each partial-file read operation from GCS.  This
+# parameter was chosen to give good throughput while keeping memory usage at
+# a reasonable level; the following table shows throughput reached when
+# reading files of a given size with a chosen buffer size and informed the
+# choice of the value, as of 11/2016:
+#
+# +---++-+-+-+
+# |   | 50 MB file | 100 MB file | 200 MB file | 400 MB file |
+# +---++-+-+-+
+# | 8 MB buffer   | 17.12 MB/s | 22.67 MB/s  | 23.81 MB/s  | 26.05 MB/s  |
+# | 16 MB buffer  | 24.21 MB/s | 42.70 MB/s  | 42.89 MB/s  | 46.92 MB/s  |
+# | 32 MB buffer  | 28.53 MB/s | 48.08 MB/s  | 54.30 MB/s  | 54.65 MB/s  |
+# | 400 MB buffer | 34.72 MB/s | 71.13 MB/s  | 79.13 MB/s  | 85.39 MB/s  |
+# +---++-+-+-+
+DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
+
+# This is the size of chunks used when writing to GCS.
 WRITE_CHUNK_SIZE = 8 * 1024 * 1024
 
 
@@ -373,7 +389,7 @@ class GcsBufferedReader(object):
 # Initialize read buffer state.
 self.download_stream = StringIO.StringIO()
 self.downloader = transfer.Download(
-self.download_stream, auto_transfer=False)
+self.download_stream, auto_transfer=False, chunksize=buffer_size)
 self.client.objects.Get(get_request, download=self.downloader)
 self.position = 0
 self.buffer = ''



[2/2] incubator-beam git commit: Closes #1379

2016-11-18 Thread robertwb
Closes #1379


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1440f7a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1440f7a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1440f7a

Branch: refs/heads/python-sdk
Commit: c1440f7aa69f0134d52463c4bcdcabce36b481d7
Parents: cce4331 e4a332d
Author: Robert Bradshaw 
Authored: Fri Nov 18 21:53:27 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 21:53:27 2016 -0800

--
 sdks/python/apache_beam/io/gcsio.py | 20 ++--
 1 file changed, 18 insertions(+), 2 deletions(-)
--




[2/2] incubator-beam git commit: Closes #1380

2016-11-18 Thread robertwb
Closes #1380


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cce4331d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cce4331d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cce4331d

Branch: refs/heads/python-sdk
Commit: cce4331dc7ed95aa32654e77d2cc170b63437183
Parents: 45b420d 99bcafe
Author: Robert Bradshaw 
Authored: Fri Nov 18 13:37:05 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 13:37:05 2016 -0800

--
 sdks/python/apache_beam/io/fileio.py  |  5 +++
 sdks/python/apache_beam/io/fileio_test.py | 47 ++
 2 files changed, 52 insertions(+)
--




[1/2] incubator-beam git commit: Fix issue where batch GCS renames were not issued

2016-11-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 45b420d82 -> cce4331dc


Fix issue where batch GCS renames were not issued


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/99bcafe7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/99bcafe7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/99bcafe7

Branch: refs/heads/python-sdk
Commit: 99bcafe7a02bbec5222d77abbad24f5eed8a687f
Parents: 45b420d
Author: Charles Chen 
Authored: Thu Nov 17 14:13:56 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 13:37:04 2016 -0800

--
 sdks/python/apache_beam/io/fileio.py  |  5 +++
 sdks/python/apache_beam/io/fileio_test.py | 47 ++
 2 files changed, 52 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 3b67c4f..4d0eea6 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -514,6 +514,7 @@ class ChannelFactory(object):
 gcs_batches = []
 gcs_current_batch = []
 for src, dest in src_dest_pairs:
+  gcs_current_batch.append((src, dest))
   if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
 gcs_batches.append(gcs_current_batch)
 gcs_current_batch = []
@@ -893,6 +894,8 @@ class FileSink(iobase.Sink):
   exception_infos = ChannelFactory.rename_batch(batch)
   for src, dest, exception in exception_infos:
 if exception:
+  logging.warning('Rename not successful: %s -> %s, %s', src, dest,
+  exception)
   should_report = True
   if isinstance(exception, IOError):
 # May have already been copied.
@@ -906,6 +909,8 @@ class FileSink(iobase.Sink):
 logging.warning(('Exception in _rename_batch. src: %s, '
  'dest: %s, err: %s'), src, dest, exception)
 exceptions.append(exception)
+else:
+  logging.debug('Rename successful: %s -> %s', src, dest)
   return exceptions
 
 # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/99bcafe7/sdks/python/apache_beam/io/fileio_test.py
--
diff --git a/sdks/python/apache_beam/io/fileio_test.py 
b/sdks/python/apache_beam/io/fileio_test.py
index 63e71e0..9d1e424 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -28,6 +28,7 @@ import unittest
 import zlib
 
 import hamcrest as hc
+import mock
 
 import apache_beam as beam
 from apache_beam import coders
@@ -881,6 +882,52 @@ class TestFileSink(unittest.TestCase):
 with self.assertRaises(Exception):
   list(sink.finalize_write(init_token, [res1, res2]))
 
+  @mock.patch('apache_beam.io.fileio.ChannelFactory.rename')
+  @mock.patch('apache_beam.io.fileio.gcsio')
+  def test_rename_batch(self, *unused_args):
+# Prepare mocks.
+gcsio_mock = mock.MagicMock()
+fileio.gcsio.GcsIO = lambda: gcsio_mock
+fileio.ChannelFactory.rename = mock.MagicMock()
+to_rename = [
+('gs://bucket/from1', 'gs://bucket/to1'),
+('gs://bucket/from2', 'gs://bucket/to2'),
+('/local/from1', '/local/to1'),
+('gs://bucket/from3', 'gs://bucket/to3'),
+('/local/from2', '/local/to2'),
+]
+gcsio_mock.copy_batch.side_effect = [[
+('gs://bucket/from1', 'gs://bucket/to1', None),
+('gs://bucket/from2', 'gs://bucket/to2', None),
+('gs://bucket/from3', 'gs://bucket/to3', None),
+]]
+gcsio_mock.delete_batch.side_effect = [[
+('gs://bucket/from1', None),
+('gs://bucket/from2', None),
+('gs://bucket/from3', None),
+]]
+
+# Issue batch rename.
+fileio.ChannelFactory.rename_batch(to_rename)
+
+# Verify mocks.
+expected_local_rename_calls = [
+mock.call('/local/from1', '/local/to1'),
+mock.call('/local/from2', '/local/to2'),
+]
+self.assertEqual(fileio.ChannelFactory.rename.call_args_list,
+ expected_local_rename_calls)
+gcsio_mock.copy_batch.assert_called_once_with([
+('gs://bucket/from1', 'gs://bucket/to1'),
+('gs://bucket/from2', 'gs://bucket/to2'),
+('gs://bucket/from3', 'gs://bucket/to3'),
+])
+gcsio_mock.delete_batch.assert_called_once_with([
+'gs://bucket/from1',
+'gs://bucket/from2',
+'gs://bucket/from3',
+])
+
 
 if __name__ 

[1/2] incubator-beam git commit: Remove redundant REQUIRED_PACKAGES

2016-11-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3a0f01c8e -> 45b420d82


Remove redundant REQUIRED_PACKAGES


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/329be6e9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/329be6e9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/329be6e9

Branch: refs/heads/python-sdk
Commit: 329be6e9a16bfe865d61c3d6041ec5fb6707fc6a
Parents: 3a0f01c
Author: Ahmet Altay 
Authored: Thu Nov 17 15:20:08 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 13:35:47 2016 -0800

--
 sdks/python/setup.py | 3 ---
 1 file changed, 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/329be6e9/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index b8034af..1299bbf 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -96,9 +96,6 @@ REQUIRED_PACKAGES = [
 'python-gflags>=2.0,<4.0.0',
 'pyyaml>=3.10,<4.0.0',
 ]
-REQUIRED_TEST_PACKAGES = [
-'pyhamcrest>=1.9,<2.0',
-]
 
 
 REQUIRED_TEST_PACKAGES = [



[2/2] incubator-beam git commit: Closes #1383

2016-11-18 Thread robertwb
Closes #1383


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/45b420d8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/45b420d8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/45b420d8

Branch: refs/heads/python-sdk
Commit: 45b420d82aa6f47e3d37f5aa5ba98378cdc01e9c
Parents: 3a0f01c 329be6e
Author: Robert Bradshaw 
Authored: Fri Nov 18 13:35:48 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 13:35:48 2016 -0800

--
 sdks/python/setup.py | 3 ---
 1 file changed, 3 deletions(-)
--




[2/2] incubator-beam git commit: Closes #1385

2016-11-18 Thread robertwb
Closes #1385


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3a0f01c8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3a0f01c8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3a0f01c8

Branch: refs/heads/python-sdk
Commit: 3a0f01c8edd36fe525b8ad155011dfb759dad2b4
Parents: b83f12b 93c5233
Author: Robert Bradshaw 
Authored: Fri Nov 18 13:33:46 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 13:33:46 2016 -0800

--
 sdks/python/apache_beam/io/filebasedsource.py | 14 ++
 .../python/apache_beam/io/filebasedsource_test.py | 18 +-
 2 files changed, 27 insertions(+), 5 deletions(-)
--




[1/2] incubator-beam git commit: Fixes a couple of issues of FileBasedSource.

2016-11-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b83f12b9f -> 3a0f01c8e


Fixes a couple of issues of FileBasedSource.

(1) Updates code so that a user-specified coder properly gets set to 
sub-sources.

(2) Currently each SingleFileSource takes a reference to FileBasedSource while  
FileBasedSource takes a reference to Concatsource.  ConcatSource has a 
reference to list of SingleFileSources. This results in quadratic space 
complexity when serializing splits of a FileBasedSource. This CL fixes this 
issue by making sure that FileBasedSource is cloned before taking a reference 
to  ConcatSource


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/93c5233a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/93c5233a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/93c5233a

Branch: refs/heads/python-sdk
Commit: 93c5233a1bf28e9b13412b909c2ee877bd6cf635
Parents: b83f12b
Author: Chamikara Jayalath 
Authored: Thu Nov 17 19:18:26 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 18 13:33:33 2016 -0800

--
 sdks/python/apache_beam/io/filebasedsource.py | 14 ++
 .../python/apache_beam/io/filebasedsource_test.py | 18 +-
 2 files changed, 27 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource.py
--
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index c7bc27e..7d8f686 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -109,6 +109,12 @@ class FileBasedSource(iobase.BoundedSource):
   file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
   sizes = FileBasedSource._estimate_sizes_in_parallel(file_names)
 
+  # We create a reference for FileBasedSource that will be serialized along
+  # with each _SingleFileSource. To prevent this FileBasedSource from 
having
+  # a reference to ConcatSource (resulting in quadratic space complexity)
+  # we clone it here.
+  file_based_source_ref = pickler.loads(pickler.dumps(self))
+
   for index, file_name in enumerate(file_names):
 if sizes[index] == 0:
   continue  # Ignoring empty file.
@@ -123,7 +129,7 @@ class FileBasedSource(iobase.BoundedSource):
 splittable = False
 
 single_file_source = _SingleFileSource(
-self, file_name,
+file_based_source_ref, file_name,
 0,
 sizes[index],
 min_bundle_size=self._min_bundle_size,
@@ -194,9 +200,6 @@ class FileBasedSource(iobase.BoundedSource):
 return self._get_concat_source().get_range_tracker(start_position,
stop_position)
 
-  def default_output_coder(self):
-return self._get_concat_source().default_output_coder()
-
   def read_records(self, file_name, offset_range_tracker):
 """Returns a generator of records created by reading file 'file_name'.
 
@@ -315,3 +318,6 @@ class _SingleFileSource(iobase.BoundedSource):
 
   def read(self, range_tracker):
 return self._file_based_source.read_records(self._file_name, range_tracker)
+
+  def default_output_coder(self):
+return self._file_based_source.default_output_coder()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/93c5233a/sdks/python/apache_beam/io/filebasedsource_test.py
--
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py 
b/sdks/python/apache_beam/io/filebasedsource_test.py
index 7f4d8d3..a455cd3 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -533,6 +533,23 @@ class TestFileBasedSource(unittest.TestCase):
 assert_that(pcoll, equal_to(lines))
 pipeline.run()
 
+  def test_splits_get_coder_from_fbs(self):
+class DummyCoder(object):
+  val = 12345
+
+class FileBasedSourceWithCoder(LineSource):
+
+  def default_output_coder(self):
+return DummyCoder()
+
+pattern, expected_data = write_pattern([34, 66, 40, 24, 24, 12])
+self.assertEqual(200, len(expected_data))
+fbs = FileBasedSourceWithCoder(pattern)
+splits = [split for split in fbs.split(desired_bundle_size=50)]
+self.assertTrue(len(splits))
+for split in splits:
+  self.assertEqual(DummyCoder.val, split.source.default_output_coder().val)
+
 
 class TestSingleFileSource(unittest.TestCase):
 
@@ -685,7 +702,6 @@ class TestSingleFileSource(unittest.TestCase):
   

[2/2] incubator-beam git commit: Fix shared state across retry decorated functions

2016-11-17 Thread robertwb
Fix shared state across retry decorated functions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7f689a7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7f689a7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7f689a7

Branch: refs/heads/python-sdk
Commit: e7f689a72143cc5c821449c862c732f877ca4645
Parents: 115cf33
Author: Sourabh Bajaj 
Authored: Tue Nov 15 15:04:37 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Nov 17 12:21:43 2016 -0800

--
 sdks/python/apache_beam/utils/retry.py  |  8 ++---
 sdks/python/apache_beam/utils/retry_test.py | 42 
 2 files changed, 45 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7f689a7/sdks/python/apache_beam/utils/retry.py
--
diff --git a/sdks/python/apache_beam/utils/retry.py 
b/sdks/python/apache_beam/utils/retry.py
index 1f5af88..b3016fd 100644
--- a/sdks/python/apache_beam/utils/retry.py
+++ b/sdks/python/apache_beam/utils/retry.py
@@ -152,12 +152,10 @@ def with_exponential_backoff(
 
   def real_decorator(fun):
 """The real decorator whose purpose is to return the wrapped function."""
-
-retry_intervals = iter(
-FuzzedExponentialIntervals(
-initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0))
-
 def wrapper(*args, **kwargs):
+  retry_intervals = iter(
+  FuzzedExponentialIntervals(
+  initial_delay_secs, num_retries, fuzz=0.5 if fuzz else 0))
   while True:
 try:
   return fun(*args, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7f689a7/sdks/python/apache_beam/utils/retry_test.py
--
diff --git a/sdks/python/apache_beam/utils/retry_test.py 
b/sdks/python/apache_beam/utils/retry_test.py
index 705c555..7570ca0 100644
--- a/sdks/python/apache_beam/utils/retry_test.py
+++ b/sdks/python/apache_beam/utils/retry_test.py
@@ -164,5 +164,47 @@ class RetryTest(unittest.TestCase):
   self.assertEqual(func_name, 'transient_failure')
 
 
+class DummyClass(object):
+  def __init__(self, results):
+self.index = 0
+self.results = results
+
+  @retry.with_exponential_backoff(num_retries=2, initial_delay_secs=0.1)
+  def func(self):
+self.index += 1
+if self.index > len(self.results) or \
+self.results[self.index - 1] == "Error":
+  raise ValueError("Error")
+return self.results[self.index - 1]
+
+
+class RetryStateTest(unittest.TestCase):
+  """The test_two_failures and test_single_failure would fail if we have
+  any shared state for the retry decorator. This test tries to prevent a bug we
+  found where the state in the decorator was shared across objects and retries
+  were not available correctly.
+
+  The test_call_two_objects would test this inside the same test.
+  """
+  def test_two_failures(self):
+dummy = DummyClass(["Error", "Error", "Success"])
+dummy.func()
+self.assertEqual(3, dummy.index)
+
+  def test_single_failure(self):
+dummy = DummyClass(["Error", "Success"])
+dummy.func()
+self.assertEqual(2, dummy.index)
+
+  def test_call_two_objects(self):
+dummy = DummyClass(["Error", "Error", "Success"])
+dummy.func()
+self.assertEqual(3, dummy.index)
+
+dummy2 = DummyClass(["Error", "Success"])
+dummy2.func()
+self.assertEqual(2, dummy2.index)
+
+
 if __name__ == '__main__':
   unittest.main()



[1/2] incubator-beam git commit: Closes #1365

2016-11-17 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 115cf33ed -> b83f12b9f


Closes #1365


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b83f12b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b83f12b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b83f12b9

Branch: refs/heads/python-sdk
Commit: b83f12b9fff05755323afd655966bf7c3ee03334
Parents: 115cf33 e7f689a
Author: Robert Bradshaw 
Authored: Thu Nov 17 12:21:43 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Nov 17 12:21:43 2016 -0800

--
 sdks/python/apache_beam/utils/retry.py  |  8 ++---
 sdks/python/apache_beam/utils/retry_test.py | 42 
 2 files changed, 45 insertions(+), 5 deletions(-)
--




[1/2] incubator-beam git commit: Upgrade Datastore version

2016-11-17 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 21b7844bb -> 115cf33ed


Upgrade Datastore version


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/249c9f4c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/249c9f4c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/249c9f4c

Branch: refs/heads/python-sdk
Commit: 249c9f4cdfae17b1f760cbdeb2a7446439fd
Parents: 21b7844
Author: Vikas Kedigehalli 
Authored: Thu Nov 17 10:43:35 2016 -0800
Committer: Vikas Kedigehalli 
Committed: Thu Nov 17 10:43:35 2016 -0800

--
 sdks/python/setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/249c9f4c/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index af59069..b8034af 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -87,7 +87,7 @@ REQUIRED_PACKAGES = [
 'avro>=1.7.7,<2.0.0',
 'dill>=0.2.5,<0.3',
 'google-apitools>=0.5.2,<1.0.0',
-'googledatastore==6.3.0',
+'googledatastore==6.4.0',
 'httplib2>=0.8,<0.10',
 'mock>=1.0.1,<3.0.0',
 'oauth2client>=2.0.1,<4.0.0',



[2/2] incubator-beam git commit: Closes #1375

2016-11-17 Thread robertwb
Closes #1375


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/115cf33e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/115cf33e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/115cf33e

Branch: refs/heads/python-sdk
Commit: 115cf33ede4f6e0267a9ec8757620efb51ceb904
Parents: 21b7844 249c9f4
Author: Robert Bradshaw 
Authored: Thu Nov 17 12:20:41 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Nov 17 12:20:41 2016 -0800

--
 sdks/python/setup.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Query Splitter for Datastore v1

2016-11-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d1fccbf5e -> 21b7844bb


Query Splitter for Datastore v1


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c1126b70
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c1126b70
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c1126b70

Branch: refs/heads/python-sdk
Commit: c1126b708469fc63bd8ab8e54026700408ec34da
Parents: d1fccbf
Author: Vikas Kedigehalli 
Authored: Mon Oct 24 18:29:29 2016 -0700
Committer: Robert Bradshaw 
Committed: Tue Nov 15 14:24:02 2016 -0800

--
 .../python/apache_beam/io/datastore/__init__.py |  16 ++
 .../apache_beam/io/datastore/v1/__init__.py |  16 ++
 .../apache_beam/io/datastore/v1/helper.py   |  84 ++
 .../apache_beam/io/datastore/v1/helper_test.py  | 124 +
 .../io/datastore/v1/query_splitter.py   | 270 +++
 .../io/datastore/v1/query_splitter_test.py  | 257 ++
 sdks/python/setup.py|   1 +
 7 files changed, 768 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/__init__.py
--
diff --git a/sdks/python/apache_beam/io/datastore/__init__.py 
b/sdks/python/apache_beam/io/datastore/__init__.py
new file mode 100644
index 000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/__init__.py
--
diff --git a/sdks/python/apache_beam/io/datastore/v1/__init__.py 
b/sdks/python/apache_beam/io/datastore/v1/__init__.py
new file mode 100644
index 000..cce3aca
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/__init__.py
@@ -0,0 +1,16 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c1126b70/sdks/python/apache_beam/io/datastore/v1/helper.py
--
diff --git a/sdks/python/apache_beam/io/datastore/v1/helper.py 
b/sdks/python/apache_beam/io/datastore/v1/helper.py
new file mode 100644
index 000..626ab35
--- /dev/null
+++ b/sdks/python/apache_beam/io/datastore/v1/helper.py
@@ -0,0 +1,84 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing 

[2/2] incubator-beam git commit: Closes #1310

2016-11-15 Thread robertwb
Closes #1310


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/21b7844b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/21b7844b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/21b7844b

Branch: refs/heads/python-sdk
Commit: 21b7844bb05e9a86531876cffe8ee776bfaaa1cc
Parents: d1fccbf c1126b7
Author: Robert Bradshaw 
Authored: Tue Nov 15 14:24:03 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 14:24:03 2016 -0800

--
 .../python/apache_beam/io/datastore/__init__.py |  16 ++
 .../apache_beam/io/datastore/v1/__init__.py |  16 ++
 .../apache_beam/io/datastore/v1/helper.py   |  84 ++
 .../apache_beam/io/datastore/v1/helper_test.py  | 124 +
 .../io/datastore/v1/query_splitter.py   | 270 +++
 .../io/datastore/v1/query_splitter_test.py  | 257 ++
 sdks/python/setup.py|   1 +
 7 files changed, 768 insertions(+)
--




[2/3] incubator-beam git commit: Fix merge lint error

2016-11-15 Thread robertwb
Fix merge lint error


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8bf25269
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8bf25269
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8bf25269

Branch: refs/heads/python-sdk
Commit: 8bf2526965dd319f654e7f995df940307fb2260f
Parents: 9d805ee
Author: Robert Bradshaw 
Authored: Tue Nov 15 11:06:27 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 11:06:27 2016 -0800

--
 sdks/python/apache_beam/io/textio.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8bf25269/sdks/python/apache_beam/io/textio.py
--
diff --git a/sdks/python/apache_beam/io/textio.py 
b/sdks/python/apache_beam/io/textio.py
index 4e94f87..9c89b68 100644
--- a/sdks/python/apache_beam/io/textio.py
+++ b/sdks/python/apache_beam/io/textio.py
@@ -244,7 +244,8 @@ class ReadFromText(PTransform):
 self._strip_trailing_newlines = strip_trailing_newlines
 self._coder = coder
 self._source = _TextSource(file_pattern, min_bundle_size, compression_type,
-   strip_trailing_newlines, coder, 
validate=validate)
+   strip_trailing_newlines, coder,
+   validate=validate)
 
   def apply(self, pvalue):
 return pvalue.pipeline | Read(self._source)



[1/3] incubator-beam git commit: Display Data for: PipelineOptions, combiners, more sources

2016-11-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 384fb5dc1 -> d1fccbf5e


Display Data for: PipelineOptions, combiners, more sources


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d805eec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d805eec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d805eec

Branch: refs/heads/python-sdk
Commit: 9d805eec6b9cedb43b6e79e255483fc8fa6832d1
Parents: 384fb5d
Author: Pablo 
Authored: Wed Nov 9 14:03:03 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 11:02:28 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py   | 18 +++--
 .../apache_beam/internal/json_value_test.py |  8 +-
 sdks/python/apache_beam/io/avroio.py| 20 -
 sdks/python/apache_beam/io/avroio_test.py   | 78 
 sdks/python/apache_beam/io/fileio.py| 20 -
 sdks/python/apache_beam/io/fileio_test.py   | 40 --
 sdks/python/apache_beam/io/iobase.py|  5 +-
 sdks/python/apache_beam/io/textio.py| 25 +--
 sdks/python/apache_beam/io/textio_test.py   | 28 +++
 sdks/python/apache_beam/pipeline_test.py| 12 +--
 sdks/python/apache_beam/transforms/combiners.py | 14 
 .../apache_beam/transforms/combiners_test.py| 63 
 sdks/python/apache_beam/transforms/core.py  | 16 +++-
 .../python/apache_beam/transforms/ptransform.py |  9 +++
 sdks/python/apache_beam/utils/options.py| 17 -
 .../apache_beam/utils/pipeline_options_test.py  | 46 ++--
 sdks/python/setup.py|  3 +
 17 files changed, 375 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index 5ac9d6e..8992ec3 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -32,6 +32,7 @@ from apache_beam import utils
 from apache_beam.internal.auth import get_service_credentials
 from apache_beam.internal.json_value import to_json_value
 from apache_beam.transforms import cy_combiners
+from apache_beam.transforms.display import DisplayData
 from apache_beam.utils import dependency
 from apache_beam.utils import retry
 from apache_beam.utils.dependency import get_required_container_version
@@ -234,11 +235,18 @@ class Environment(object):
   self.proto.sdkPipelineOptions = (
   dataflow.Environment.SdkPipelineOptionsValue())
 
-  for k, v in sdk_pipeline_options.iteritems():
-if v is not None:
-  self.proto.sdkPipelineOptions.additionalProperties.append(
-  dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
-  key=k, value=to_json_value(v)))
+  options_dict = {k: v
+  for k, v in sdk_pipeline_options.iteritems()
+  if v is not None}
+  self.proto.sdkPipelineOptions.additionalProperties.append(
+  dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+  key='options', value=to_json_value(options_dict)))
+
+  dd = DisplayData.create_from(options)
+  items = [item.get_dict() for item in dd.items]
+  self.proto.sdkPipelineOptions.additionalProperties.append(
+  dataflow.Environment.SdkPipelineOptionsValue.AdditionalProperty(
+  key='display_data', value=to_json_value(items)))
 
 
 class Job(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/internal/json_value_test.py
--
diff --git a/sdks/python/apache_beam/internal/json_value_test.py 
b/sdks/python/apache_beam/internal/json_value_test.py
index cfab293..a4a47b8 100644
--- a/sdks/python/apache_beam/internal/json_value_test.py
+++ b/sdks/python/apache_beam/internal/json_value_test.py
@@ -76,14 +76,8 @@ class JsonValueTest(unittest.TestCase):
 self.assertEquals(long(27), from_json_value(to_json_value(long(27
 
   def test_too_long_value(self):
-try:
+with self.assertRaises(TypeError):
   to_json_value(long(1 << 64))
-except TypeError as e:
-  pass
-except Exception as e:
-  self.fail('Unexpected exception raised: {}'.format(e))
-else:
-  self.fail('TypeError not raised.')
 
 
 if __name__ == '__main__':

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d805eec/sdks/python/apache_beam/io/avroio.py

[3/3] incubator-beam git commit: Closes #1264

2016-11-15 Thread robertwb
Closes #1264


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d1fccbf5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d1fccbf5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d1fccbf5

Branch: refs/heads/python-sdk
Commit: d1fccbf5eb5064a2a1a6831bb523f8cdf705c8d8
Parents: 384fb5d 8bf2526
Author: Robert Bradshaw 
Authored: Tue Nov 15 11:06:50 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 11:06:50 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py   | 18 +++--
 .../apache_beam/internal/json_value_test.py |  8 +-
 sdks/python/apache_beam/io/avroio.py| 20 -
 sdks/python/apache_beam/io/avroio_test.py   | 78 
 sdks/python/apache_beam/io/fileio.py| 20 -
 sdks/python/apache_beam/io/fileio_test.py   | 40 --
 sdks/python/apache_beam/io/iobase.py|  5 +-
 sdks/python/apache_beam/io/textio.py| 26 +--
 sdks/python/apache_beam/io/textio_test.py   | 28 +++
 sdks/python/apache_beam/pipeline_test.py| 12 +--
 sdks/python/apache_beam/transforms/combiners.py | 14 
 .../apache_beam/transforms/combiners_test.py| 63 
 sdks/python/apache_beam/transforms/core.py  | 16 +++-
 .../python/apache_beam/transforms/ptransform.py |  9 +++
 sdks/python/apache_beam/utils/options.py| 17 -
 .../apache_beam/utils/pipeline_options_test.py  | 46 ++--
 sdks/python/setup.py|  3 +
 17 files changed, 376 insertions(+), 47 deletions(-)
--




[2/3] incubator-beam git commit: Add a couple of missing coder tests.

2016-11-15 Thread robertwb
Add a couple of missing coder tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ab02a1d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ab02a1d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ab02a1d6

Branch: refs/heads/python-sdk
Commit: ab02a1d60ce70f31c087a78a940bb4833b477ebb
Parents: c4208a8
Author: Robert Bradshaw 
Authored: Wed Nov 9 13:47:53 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 10:48:35 2016 -0800

--
 .../apache_beam/coders/coders_test_common.py| 24 
 1 file changed, 19 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ab02a1d6/sdks/python/apache_beam/coders/coders_test_common.py
--
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index adeb6a5..2ec8e7f 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -59,13 +59,9 @@ class CodersTest(unittest.TestCase):
'Base' not in c.__name__)
 standard -= set([coders.Coder,
  coders.FastCoder,
- coders.Base64PickleCoder,
- coders.FloatCoder,
  coders.ProtoCoder,
- coders.TimestampCoder,
  coders.ToStringCoder,
- coders.WindowCoder,
- coders.WindowedValueCoder])
+ coders.WindowCoder])
 assert not standard - cls.seen, standard - cls.seen
 assert not standard - cls.seen_nested, standard - cls.seen_nested
 
@@ -155,6 +151,9 @@ class CodersTest(unittest.TestCase):
 self.check_coder(coders.FloatCoder(),
  *[float(2 ** (0.1 * x)) for x in range(-100, 100)])
 self.check_coder(coders.FloatCoder(), float('-Inf'), float('Inf'))
+self.check_coder(
+coders.TupleCoder((coders.FloatCoder(), coders.FloatCoder())),
+(0, 1), (-100, 100), (0.5, 0.25))
 
   def test_singleton_coder(self):
 a = 'anything'
@@ -173,6 +172,9 @@ class CodersTest(unittest.TestCase):
 self.check_coder(coders.TimestampCoder(),
  timestamp.Timestamp(micros=-1234567890123456789),
  timestamp.Timestamp(micros=1234567890123456789))
+self.check_coder(
+coders.TupleCoder((coders.TimestampCoder(), coders.BytesCoder())),
+(timestamp.Timestamp.of(27), 'abc'))
 
   def test_tuple_coder(self):
 self.check_coder(
@@ -209,6 +211,18 @@ class CodersTest(unittest.TestCase):
coders.IterableCoder(coders.VarIntCoder(,
 (1, [1, 2, 3]))
 
+  def test_windowed_value_coder(self):
+self.check_coder(
+coders.WindowedValueCoder(coders.VarIntCoder()),
+windowed_value.WindowedValue(3, -100, ()),
+windowed_value.WindowedValue(-1, 100, (1, 2, 3)))
+self.check_coder(
+coders.TupleCoder((
+coders.WindowedValueCoder(coders.FloatCoder()),
+coders.WindowedValueCoder(coders.StrUtf8Coder(,
+(windowed_value.WindowedValue(1.5, 0, ()),
+ windowed_value.WindowedValue("abc", 10, ('window',
+
   def test_proto_coder(self):
 # For instructions on how these test proto message were generated,
 # see coders_test.py



[GitHub] incubator-beam pull request #1326: Additional Coder tests

2016-11-15 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/1326


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #1304

2016-11-15 Thread robertwb
Closes #1304


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c4208a89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c4208a89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c4208a89

Branch: refs/heads/python-sdk
Commit: c4208a899c38a92acdd95ccf37cb53237a593535
Parents: 6ac6e42 d0e3121
Author: Robert Bradshaw 
Authored: Tue Nov 15 08:53:07 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 08:53:07 2016 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 1 +
 sdks/python/apache_beam/utils/names.py | 1 +
 2 files changed, 2 insertions(+)
--




[1/2] incubator-beam git commit: Allow for passing format so that we can migrate to BQ Avro export later

2016-11-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 6ac6e420f -> c4208a899


Allow for passing format so that we can migrate to BQ Avro export later


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d0e31218
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d0e31218
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d0e31218

Branch: refs/heads/python-sdk
Commit: d0e312184e319050baa02abff2c08348b6cfb651
Parents: 6ac6e42
Author: Sourabh Bajaj 
Authored: Mon Nov 7 18:15:17 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 08:53:06 2016 -0800

--
 sdks/python/apache_beam/runners/dataflow_runner.py | 1 +
 sdks/python/apache_beam/utils/names.py | 1 +
 2 files changed, 2 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0e31218/sdks/python/apache_beam/runners/dataflow_runner.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow_runner.py 
b/sdks/python/apache_beam/runners/dataflow_runner.py
index 57867fa..00b466b 100644
--- a/sdks/python/apache_beam/runners/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow_runner.py
@@ -515,6 +515,7 @@ class DataflowPipelineRunner(PipelineRunner):
 elif transform.source.format == 'text':
   step.add_property(PropertyNames.FILE_PATTERN, transform.source.path)
 elif transform.source.format == 'bigquery':
+  step.add_property(PropertyNames.BIGQUERY_EXPORT_FORMAT, 'FORMAT_JSON')
   # TODO(silviuc): Add table validation if transform.source.validate.
   if transform.source.table_reference is not None:
 step.add_property(PropertyNames.BIGQUERY_DATASET,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d0e31218/sdks/python/apache_beam/utils/names.py
--
diff --git a/sdks/python/apache_beam/utils/names.py 
b/sdks/python/apache_beam/utils/names.py
index be8c92a..3edde3c 100644
--- a/sdks/python/apache_beam/utils/names.py
+++ b/sdks/python/apache_beam/utils/names.py
@@ -46,6 +46,7 @@ class PropertyNames(object):
   BIGQUERY_DATASET = 'dataset'
   BIGQUERY_QUERY = 'bigquery_query'
   BIGQUERY_USE_LEGACY_SQL = 'bigquery_use_legacy_sql'
+  BIGQUERY_EXPORT_FORMAT = 'bigquery_export_format'
   BIGQUERY_TABLE = 'table'
   BIGQUERY_PROJECT = 'project'
   BIGQUERY_SCHEMA = 'schema'



[1/2] incubator-beam git commit: Add IP configuration to Python SDK

2016-11-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 66f324b35 -> 6ac6e420f


Add IP configuration to Python SDK


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af6f4e90
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af6f4e90
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af6f4e90

Branch: refs/heads/python-sdk
Commit: af6f4e90a239dbf1a2ad6f0cd8974602dfa9e9b4
Parents: 66f324b
Author: Sam McVeety 
Authored: Fri Nov 11 19:56:34 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 08:50:31 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py | 9 +
 sdks/python/apache_beam/utils/options.py  | 4 
 2 files changed, 13 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6f4e90/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index 8c7cc29..5ac9d6e 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -210,6 +210,15 @@ class Environment(object):
 pool.teardownPolicy = (
 dataflow.WorkerPool
 .TeardownPolicyValueValuesEnum.TEARDOWN_ON_SUCCESS)
+if self.worker_options.use_public_ips is not None:
+  if self.worker_options.use_public_ips:
+pool.ipConfiguration = (
+dataflow.WorkerPool
+.IpConfigurationValueValuesEnum.WORKER_IP_PUBLIC)
+  else:
+pool.ipConfiguration = (
+dataflow.WorkerPool
+.IpConfigurationValueValuesEnum.WORKER_IP_PRIVATE)
 
 if self.standard_options.streaming:
   # Use separate data disk for streaming.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6f4e90/sdks/python/apache_beam/utils/options.py
--
diff --git a/sdks/python/apache_beam/utils/options.py 
b/sdks/python/apache_beam/utils/options.py
index ecc85ba..f68335b 100644
--- a/sdks/python/apache_beam/utils/options.py
+++ b/sdks/python/apache_beam/utils/options.py
@@ -348,6 +348,10 @@ class WorkerOptions(PipelineOptions):
 help=
 ('The teardown policy for the VMs. By default this is left unset and '
  'the service sets the default policy.'))
+parser.add_argument(
+'--use_public_ips',
+default=None,
+help='Whether to assign public IP addresses to the worker machines.')
 
   def validate(self, validator):
 errors = []



[2/2] incubator-beam git commit: Closes #1354

2016-11-15 Thread robertwb
Closes #1354


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ac6e420
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ac6e420
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ac6e420

Branch: refs/heads/python-sdk
Commit: 6ac6e420f4f91e3326b07753004d3a56f34b4226
Parents: 66f324b af6f4e9
Author: Robert Bradshaw 
Authored: Tue Nov 15 08:50:32 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 08:50:32 2016 -0800

--
 sdks/python/apache_beam/internal/apiclient.py | 9 +
 sdks/python/apache_beam/utils/options.py  | 4 
 2 files changed, 13 insertions(+)
--




[1/2] incubator-beam git commit: Use batch GCS operations during FileSink write finalization

2016-11-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 560fe79f8 -> 66f324b35


Use batch GCS operations during FileSink write finalization


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/313191e1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/313191e1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/313191e1

Branch: refs/heads/python-sdk
Commit: 313191e129b884e4e14e9f503a757147d368217c
Parents: 560fe79
Author: Charles Chen 
Authored: Thu Nov 10 11:54:08 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 08:48:47 2016 -0800

--
 sdks/python/apache_beam/io/fileio.py  | 177 -
 sdks/python/apache_beam/io/fileio_test.py |   2 +-
 sdks/python/apache_beam/io/gcsio.py   |  78 +++
 sdks/python/apache_beam/io/gcsio_test.py  | 103 +-
 sdks/python/apache_beam/utils/retry.py|   3 +
 5 files changed, 298 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/313191e1/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 669bfc9..ef20a7c 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -31,6 +31,7 @@ import zlib
 import weakref
 
 from apache_beam import coders
+from apache_beam.io import gcsio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.runners.dataflow.native_io import iobase as dataflow_io
@@ -451,8 +452,6 @@ class ChannelFactory(object):
   'was %s' % type(compression_type))
 
 if path.startswith('gs://'):
-  # pylint: disable=wrong-import-order, wrong-import-position
-  from apache_beam.io import gcsio
   raw_file = gcsio.GcsIO().open(
   path,
   mode,
@@ -470,40 +469,92 @@ class ChannelFactory(object):
 return isinstance(fileobj, _CompressedFile)
 
   @staticmethod
-  def rename(src, dst):
+  def rename(src, dest):
 if src.startswith('gs://'):
-  assert dst.startswith('gs://'), dst
-  # pylint: disable=wrong-import-order, wrong-import-position
-  from apache_beam.io import gcsio
-  gcsio.GcsIO().rename(src, dst)
+  if not dest.startswith('gs://'):
+raise ValueError('Destination %r must be GCS path.', dest)
+  gcsio.GcsIO().rename(src, dest)
 else:
   try:
-os.rename(src, dst)
+os.rename(src, dest)
   except OSError as err:
 raise IOError(err)
 
   @staticmethod
-  def copytree(src, dst):
+  def rename_batch(src_dest_pairs):
+# Filter out local and GCS operations.
+local_src_dest_pairs = []
+gcs_src_dest_pairs = []
+for src, dest in src_dest_pairs:
+  if src.startswith('gs://'):
+if not dest.startswith('gs://'):
+  raise ValueError('Destination %r must be GCS path.', dest)
+gcs_src_dest_pairs.append((src, dest))
+  else:
+local_src_dest_pairs.append((src, dest))
+
+# Execute local operations.
+exceptions = []
+for src, dest in local_src_dest_pairs:
+  try:
+ChannelFactory.rename(src, dest)
+  except Exception as e:  # pylint: disable=broad-except
+exceptions.append((src, dest, e))
+
+# Execute GCS operations.
+exceptions += ChannelFactory._rename_gcs_batch(gcs_src_dest_pairs)
+
+return exceptions
+
+  @staticmethod
+  def _rename_gcs_batch(src_dest_pairs):
+# Prepare batches.
+gcs_batches = []
+gcs_current_batch = []
+for src, dest in src_dest_pairs:
+  if len(gcs_current_batch) == gcsio.MAX_BATCH_OPERATION_SIZE:
+gcs_batches.append(gcs_current_batch)
+gcs_current_batch = []
+if gcs_current_batch:
+  gcs_batches.append(gcs_current_batch)
+
+# Execute GCS renames if any and return exceptions.
+exceptions = []
+for batch in gcs_batches:
+  copy_statuses = gcsio.GcsIO().copy_batch(batch)
+  copy_succeeded = []
+  for src, dest, exception in copy_statuses:
+if exception:
+  exceptions.append((src, dest, exception))
+else:
+  copy_succeeded.append((src, dest))
+  delete_batch = [src for src, dest in copy_succeeded]
+  delete_statuses = gcsio.GcsIO().delete_batch(delete_batch)
+  for i, (src, exception) in enumerate(delete_statuses):
+dest = copy_succeeded[i]
+if exception:
+  exceptions.append((src, dest, exception))
+return exceptions
+
+  @staticmethod
+  def copytree(src, dest):
 if src.startswith('gs://'):
-  assert dst.startswith('gs://'), dst
+  if not dest.startswith('gs://'):
+  

[2/2] incubator-beam git commit: Closes #1337

2016-11-15 Thread robertwb
Closes #1337


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/66f324b3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/66f324b3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/66f324b3

Branch: refs/heads/python-sdk
Commit: 66f324b350c50720cc88357bc2d56e2ecd99adc8
Parents: 560fe79 313191e
Author: Robert Bradshaw 
Authored: Tue Nov 15 08:48:48 2016 -0800
Committer: Robert Bradshaw 
Committed: Tue Nov 15 08:48:48 2016 -0800

--
 sdks/python/apache_beam/io/fileio.py  | 177 -
 sdks/python/apache_beam/io/fileio_test.py |   2 +-
 sdks/python/apache_beam/io/gcsio.py   |  78 +++
 sdks/python/apache_beam/io/gcsio_test.py  | 103 +-
 sdks/python/apache_beam/utils/retry.py|   3 +
 5 files changed, 298 insertions(+), 65 deletions(-)
--




[1/2] incubator-beam git commit: [BEAM-852] Add validation to file based sources during create time

2016-11-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 15e78b28a -> 560fe79f8


[BEAM-852] Add validation to file based sources during create time


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76ad2929
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76ad2929
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76ad2929

Branch: refs/heads/python-sdk
Commit: 76ad29296fd57e1eec97bf40d9cf3a1d54a63a3f
Parents: 15e78b2
Author: Sourabh Bajaj 
Authored: Mon Nov 14 15:40:10 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Nov 14 15:40:10 2016 -0800

--
 sdks/python/apache_beam/io/avroio.py|  8 +++-
 sdks/python/apache_beam/io/bigquery.py  |  2 +-
 sdks/python/apache_beam/io/filebasedsource.py   | 16 +++-
 .../apache_beam/io/filebasedsource_test.py  | 41 ++--
 sdks/python/apache_beam/io/textio.py| 13 +--
 5 files changed, 60 insertions(+), 20 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/avroio.py
--
diff --git a/sdks/python/apache_beam/io/avroio.py 
b/sdks/python/apache_beam/io/avroio.py
index 53ed95a..e7e73dd 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -37,7 +37,7 @@ __all__ = ['ReadFromAvro', 'WriteToAvro']
 class ReadFromAvro(PTransform):
   """A ``PTransform`` for reading avro files."""
 
-  def __init__(self, file_pattern=None, min_bundle_size=0):
+  def __init__(self, file_pattern=None, min_bundle_size=0, validate=True):
 """Initializes ``ReadFromAvro``.
 
 Uses source '_AvroSource' to read a set of Avro files defined by a given
@@ -70,13 +70,17 @@ class ReadFromAvro(PTransform):
   file_pattern: the set of files to be read.
   min_bundle_size: the minimum size in bytes, to be considered when
splitting the input into bundles.
+  validate: flag to verify that the files exist during the pipeline
+creation time.
   **kwargs: Additional keyword arguments to be passed to the base class.
 """
 super(ReadFromAvro, self).__init__()
 self._args = (file_pattern, min_bundle_size)
+self._validate = validate
 
   def apply(self, pvalue):
-return pvalue.pipeline | Read(_AvroSource(*self._args))
+return pvalue.pipeline | Read(_AvroSource(*self._args,
+  validate=self._validate))
 
 
 class _AvroUtils(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/bigquery.py
--
diff --git a/sdks/python/apache_beam/io/bigquery.py 
b/sdks/python/apache_beam/io/bigquery.py
index f0e88a6..8d7892a 100644
--- a/sdks/python/apache_beam/io/bigquery.py
+++ b/sdks/python/apache_beam/io/bigquery.py
@@ -65,7 +65,7 @@ input entails querying the table for all its rows. The coder 
argument on
 BigQuerySource controls the reading of the lines in the export files (i.e.,
 transform a JSON object into a PCollection element). The coder is not involved
 when the same table is read as a side input since there is no intermediate
-format involved.  We get the table rows directly from the BigQuery service with
+format involved. We get the table rows directly from the BigQuery service with
 a query.
 
 Users may provide a query to read from rather than reading all of a BigQuery

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76ad2929/sdks/python/apache_beam/io/filebasedsource.py
--
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index 58ad118..c7bc27e 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -50,7 +50,8 @@ class FileBasedSource(iobase.BoundedSource):
file_pattern,
min_bundle_size=0,
compression_type=fileio.CompressionTypes.AUTO,
-   splittable=True):
+   splittable=True,
+   validate=True):
 """Initializes ``FileBasedSource``.
 
 Args:
@@ -68,10 +69,13 @@ class FileBasedSource(iobase.BoundedSource):
   the file, for example, for compressed files where currently
   it is not possible to efficiently read a data range without
   decompressing the whole file.
+  validate: Boolean flag to verify that the files exist during the pipeline
+creation time.
 Raises:
   TypeError: when 

[2/2] incubator-beam git commit: Closes #1220

2016-11-14 Thread robertwb
Closes #1220


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/560fe79f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/560fe79f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/560fe79f

Branch: refs/heads/python-sdk
Commit: 560fe79f875b9d08b7256e6bf653a48b19f0ccb5
Parents: 15e78b2 76ad292
Author: Robert Bradshaw 
Authored: Mon Nov 14 15:41:00 2016 -0800
Committer: Robert Bradshaw 
Committed: Mon Nov 14 15:41:00 2016 -0800

--
 sdks/python/apache_beam/io/avroio.py|  8 +++-
 sdks/python/apache_beam/io/bigquery.py  |  2 +-
 sdks/python/apache_beam/io/filebasedsource.py   | 16 +++-
 .../apache_beam/io/filebasedsource_test.py  | 41 ++--
 sdks/python/apache_beam/io/textio.py| 13 +--
 5 files changed, 60 insertions(+), 20 deletions(-)
--




[2/2] incubator-beam git commit: Closes #1349

2016-11-11 Thread robertwb
Closes #1349


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/15e78b28
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/15e78b28
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/15e78b28

Branch: refs/heads/python-sdk
Commit: 15e78b28a63f0987d7e361f5f5b4c9b6be532316
Parents: 6fb4169 1fc9f70
Author: Robert Bradshaw 
Authored: Fri Nov 11 11:51:21 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 11 11:51:21 2016 -0800

--
 sdks/python/apache_beam/utils/windowed_value.pxd | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Remove the inline from WindowedValue.create()

2016-11-11 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 6fb416989 -> 15e78b28a


Remove the inline from WindowedValue.create()


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1fc9f70b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1fc9f70b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1fc9f70b

Branch: refs/heads/python-sdk
Commit: 1fc9f70bfeec18d62a4141a44f3bbd40151efd85
Parents: 6fb4169
Author: Ahmet Altay 
Authored: Fri Nov 11 11:43:18 2016 -0800
Committer: Ahmet Altay 
Committed: Fri Nov 11 11:43:18 2016 -0800

--
 sdks/python/apache_beam/utils/windowed_value.pxd | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/1fc9f70b/sdks/python/apache_beam/utils/windowed_value.pxd
--
diff --git a/sdks/python/apache_beam/utils/windowed_value.pxd 
b/sdks/python/apache_beam/utils/windowed_value.pxd
index 8799914..41c2986 100644
--- a/sdks/python/apache_beam/utils/windowed_value.pxd
+++ b/sdks/python/apache_beam/utils/windowed_value.pxd
@@ -34,5 +34,5 @@ cdef class WindowedValue(object):
   cdef inline bint _typed_eq(WindowedValue left, WindowedValue right) except? 
-2
 
 @cython.locals(wv=WindowedValue)
-cdef inline WindowedValue create(
+cdef WindowedValue create(
   object value, int64_t timestamp_micros, object windows)



[1/2] incubator-beam git commit: Closes #1346

2016-11-11 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 778194f22 -> 6fb416989


Closes #1346


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6fb41698
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6fb41698
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6fb41698

Branch: refs/heads/python-sdk
Commit: 6fb416989128519cb541bced107facae9012f51c
Parents: 778194f 6f93cd5
Author: Robert Bradshaw 
Authored: Fri Nov 11 00:23:45 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 11 00:23:45 2016 -0800

--
 .../examples/snippets/snippets_test.py  | 14 ++---
 sdks/python/apache_beam/pipeline_test.py|  5 +++
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 .../apache_beam/runners/direct/executor.py  | 33 
 .../runners/direct/transform_evaluator.py   |  8 +++--
 5 files changed, 41 insertions(+), 21 deletions(-)
--




[2/2] incubator-beam git commit: DirectPipelineRunner bug fixes.

2016-11-11 Thread robertwb
DirectPipelineRunner bug fixes.

- Execute empty [] | pipelines to the end.
- use pickler to serialize/deserialize DoFns instead of deepcopy
  similar to the othe execution environments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f93cd58
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f93cd58
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f93cd58

Branch: refs/heads/python-sdk
Commit: 6f93cd5884797c0880766c7737e106765becf96d
Parents: 778194f
Author: Ahmet Altay 
Authored: Thu Nov 10 17:37:45 2016 -0800
Committer: Robert Bradshaw 
Committed: Fri Nov 11 00:23:45 2016 -0800

--
 .../examples/snippets/snippets_test.py  | 14 ++---
 sdks/python/apache_beam/pipeline_test.py|  5 +++
 .../apache_beam/runners/direct/direct_runner.py |  2 +-
 .../apache_beam/runners/direct/executor.py  | 33 
 .../runners/direct/transform_evaluator.py   |  8 +++--
 5 files changed, 41 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/examples/snippets/snippets_test.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index edc0a17..72fccb2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -29,6 +29,8 @@ from apache_beam import io
 from apache_beam import pvalue
 from apache_beam import typehints
 from apache_beam.io import fileio
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import equal_to
 from apache_beam.utils.options import TypeOptions
 from apache_beam.examples.snippets import snippets
 
@@ -307,7 +309,9 @@ class TypeHintsTest(unittest.TestCase):
   # [END type_hints_runtime_on]
 
   def test_deterministic_key(self):
-lines = ['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 
'zucchini,veg,3']
+p = beam.Pipeline('DirectPipelineRunner')
+lines = (p | beam.Create(
+['banana,fruit,3', 'kiwi,fruit,2', 'kiwi,fruit,2', 'zucchini,veg,3']))
 
 # [START type_hints_deterministic_key]
 class Player(object):
@@ -338,9 +342,11 @@ class TypeHintsTest(unittest.TestCase):
 beam.typehints.Tuple[Player, int]))
 # [END type_hints_deterministic_key]
 
-self.assertEquals(
-{('banana', 3), ('kiwi', 4), ('zucchini', 3)},
-set(totals | beam.Map(lambda (k, v): (k.name, v
+assert_that(
+totals | beam.Map(lambda (k, v): (k.name, v)),
+equal_to([('banana', 3), ('kiwi', 4), ('zucchini', 3)]))
+
+p.run()
 
 
 class SnippetsTest(unittest.TestCase):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index a4c983f..013796c 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -24,6 +24,7 @@ from apache_beam.pipeline import Pipeline
 from apache_beam.pipeline import PipelineOptions
 from apache_beam.pipeline import PipelineVisitor
 from apache_beam.runners.dataflow.native_io.iobase import NativeSource
+from apache_beam.transforms import CombineGlobally
 from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Map
@@ -217,6 +218,10 @@ class PipelineTest(unittest.TestCase):
 
 pipeline.run()
 
+  def test_aggregator_empty_input(self):
+actual = [] | CombineGlobally(max).without_defaults()
+self.assertEqual(actual, [])
+
   def test_pipeline_as_context(self):
 def raise_exception(exn):
   raise exn

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f93cd58/sdks/python/apache_beam/runners/direct/direct_runner.py
--
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner.py 
b/sdks/python/apache_beam/runners/direct/direct_runner.py
index 2e5fe74..1afd486 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner.py
@@ -125,7 +125,7 @@ class BufferingInMemoryCache(object):
 for key, value in self._cache.iteritems():
   applied_ptransform, tag = key
   self._pvalue_cache.cache_output(applied_ptransform, tag, value)
-  self._cache = None
+self._cache = None
 
 
 class DirectPipelineResult(PipelineResult):


[2/2] incubator-beam git commit: Closes #1330

2016-11-10 Thread robertwb
Closes #1330


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/778194f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/778194f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/778194f2

Branch: refs/heads/python-sdk
Commit: 778194f22ae850195c270991499f988f8fe50972
Parents: ec00c53 4827ae8
Author: Robert Bradshaw 
Authored: Thu Nov 10 14:05:02 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Nov 10 14:05:02 2016 -0800

--
 sdks/python/run_postcommit.sh | 3 +++
 1 file changed, 3 insertions(+)
--




[1/2] incubator-beam git commit: Remove tox cache from previous workspace

2016-11-10 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk ec00c530c -> 778194f22


Remove tox cache from previous workspace

Jenkins doesn't cleanup the previous workspace, and since .tox
is in the gitignore, we must explicitly delete it.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4827ae84
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4827ae84
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4827ae84

Branch: refs/heads/python-sdk
Commit: 4827ae840797eb43aa7e4265dad7112804b5fb85
Parents: ec00c53
Author: Vikas Kedigehalli 
Authored: Wed Nov 9 17:25:15 2016 -0800
Committer: Robert Bradshaw 
Committed: Thu Nov 10 14:05:01 2016 -0800

--
 sdks/python/run_postcommit.sh | 3 +++
 1 file changed, 3 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4827ae84/sdks/python/run_postcommit.sh
--
diff --git a/sdks/python/run_postcommit.sh b/sdks/python/run_postcommit.sh
index 23dd516..2cd40da 100755
--- a/sdks/python/run_postcommit.sh
+++ b/sdks/python/run_postcommit.sh
@@ -31,6 +31,9 @@ set -v
 # pip install --user installation location.
 LOCAL_PATH=$HOME/.local/bin/
 
+# Remove any tox cache from previous workspace
+rm -rf sdks/python/.tox
+
 # INFRA does not install virtualenv
 pip install virtualenv --user
 



[GitHub] incubator-beam pull request #1326: Additional Coder tests

2016-11-09 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/1326

Additional Coder tests

I noticed these were missing while merging another PR. 

R: @mariapython

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam coder-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1326.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1326


commit 4f8c3060266fc857908187e365f8a44b49fdee00
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-11-09T21:47:53Z

Add a couple of missing coder tests.

commit 3f8b2c7e52448798c166751cd74be54518650ee9
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-11-09T21:56:31Z

Also check coder determinism.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #1253: Optimize WindowedValueCoder

2016-11-09 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/1253


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #1253

2016-11-09 Thread robertwb
Closes #1253


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ec00c530
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ec00c530
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ec00c530

Branch: refs/heads/python-sdk
Commit: ec00c530c9a54ce61095214fcce7b69a7c653d95
Parents: ea64242 0c90fb8
Author: Robert Bradshaw 
Authored: Wed Nov 9 13:26:45 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Nov 9 13:26:45 2016 -0800

--
 sdks/python/apache_beam/coders/coder_impl.pxd   |  4 
 sdks/python/apache_beam/coders/coder_impl.py| 21 ++--
 .../apache_beam/coders/coders_test_common.py|  8 ++--
 3 files changed, 21 insertions(+), 12 deletions(-)
--




[1/2] incubator-beam git commit: Optimize WindowedValueCoder

2016-11-09 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk ea642428f -> ec00c530c


Optimize WindowedValueCoder


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0c90fb80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0c90fb80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0c90fb80

Branch: refs/heads/python-sdk
Commit: 0c90fb80f3961848aa82667e8891cfebf4dbc351
Parents: ea64242
Author: Robert Bradshaw <rober...@google.com>
Authored: Tue Nov 1 16:12:19 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Wed Nov 9 13:26:44 2016 -0800

--
 sdks/python/apache_beam/coders/coder_impl.pxd   |  4 
 sdks/python/apache_beam/coders/coder_impl.py| 21 ++--
 .../apache_beam/coders/coders_test_common.py|  8 ++--
 3 files changed, 21 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.pxd
--
diff --git a/sdks/python/apache_beam/coders/coder_impl.pxd 
b/sdks/python/apache_beam/coders/coder_impl.pxd
index e021c2e..7ff 100644
--- a/sdks/python/apache_beam/coders/coder_impl.pxd
+++ b/sdks/python/apache_beam/coders/coder_impl.pxd
@@ -26,6 +26,7 @@ cimport libc.stdlib
 cimport libc.string
 
 from .stream cimport InputStream, OutputStream
+from apache_beam.utils cimport windowed_value
 
 
 cdef object loads, dumps, create_InputStream, create_OutputStream, 
ByteCountingOutputStream, get_varint_size
@@ -137,3 +138,6 @@ cdef class WindowedValueCoderImpl(StreamCoderImpl):
 
   @cython.locals(c=CoderImpl)
   cpdef get_estimated_size_and_observables(self, value, bint nested=?)
+
+  @cython.locals(wv=windowed_value.WindowedValue)
+  cpdef encode_to_stream(self, value, OutputStream stream, bint nested)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coder_impl.py
--
diff --git a/sdks/python/apache_beam/coders/coder_impl.py 
b/sdks/python/apache_beam/coders/coder_impl.py
index d075814..47a837f 100644
--- a/sdks/python/apache_beam/coders/coder_impl.py
+++ b/sdks/python/apache_beam/coders/coder_impl.py
@@ -29,7 +29,7 @@ from types import NoneType
 
 from apache_beam.coders import observable
 from apache_beam.utils.timestamp import Timestamp
-from apache_beam.utils.windowed_value import WindowedValue
+from apache_beam.utils import windowed_value
 
 # pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
 try:
@@ -535,19 +535,28 @@ class WindowedValueCoderImpl(StreamCoderImpl):
   """A coder for windowed values."""
 
   def __init__(self, value_coder, timestamp_coder, window_coder):
+# TODO(robertwb): Do we need the ability to customize timestamp_coder?
 self._value_coder = value_coder
 self._timestamp_coder = timestamp_coder
 self._windows_coder = TupleSequenceCoderImpl(window_coder)
 
   def encode_to_stream(self, value, out, nested):
-self._value_coder.encode_to_stream(value.value, out, True)
-self._timestamp_coder.encode_to_stream(value.timestamp, out, True)
-self._windows_coder.encode_to_stream(value.windows, out, True)
+wv = value  # type cast
+self._value_coder.encode_to_stream(wv.value, out, True)
+if isinstance(self._timestamp_coder, TimestampCoderImpl):
+  # Avoid creation of Timestamp object.
+  out.write_bigendian_int64(wv.timestamp_micros)
+else:
+  self._timestamp_coder.encode_to_stream(wv.timestamp, out, True)
+self._windows_coder.encode_to_stream(wv.windows, out, True)
 
   def decode_from_stream(self, in_stream, nested):
-return WindowedValue(
+return windowed_value.create(
 self._value_coder.decode_from_stream(in_stream, True),
-self._timestamp_coder.decode_from_stream(in_stream, True),
+# Avoid creation of Timestamp object.
+in_stream.read_bigendian_int64()
+if isinstance(self._timestamp_coder, TimestampCoderImpl)
+else self._timestamp_coder.decode_from_stream(in_stream, True).micros,
 self._windows_coder.decode_from_stream(in_stream, True))
 
   def get_estimated_size_and_observables(self, value, nested=False):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0c90fb80/sdks/python/apache_beam/coders/coders_test_common.py
--
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 1af8347..adeb6a5 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_bea

[GitHub] incubator-beam pull request #1323: Testing pr 1212

2016-11-09 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/1323


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #1212

2016-11-09 Thread robertwb
Closes #1212


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ea642428
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ea642428
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ea642428

Branch: refs/heads/python-sdk
Commit: ea642428f3abf8e074ee52092d10616e8ad0c33a
Parents: cf026bb 3c1043a
Author: Robert Bradshaw 
Authored: Wed Nov 9 13:24:58 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Nov 9 13:24:58 2016 -0800

--
 sdks/python/apache_beam/io/bigquery.py  |  32 +
 sdks/python/apache_beam/io/bigquery_test.py |  68 -
 sdks/python/apache_beam/io/filebasedsource.py   |   6 +
 .../apache_beam/io/filebasedsource_test.py  |  44 --
 sdks/python/apache_beam/io/fileio.py|  20 +++
 sdks/python/apache_beam/io/fileio_test.py   |  62 -
 sdks/python/apache_beam/io/iobase.py|  12 +-
 sdks/python/apache_beam/io/pubsub.py|  15 ++
 sdks/python/apache_beam/io/pubsub_test.py   |  62 +
 .../runners/dataflow/native_io/iobase.py|   5 +-
 sdks/python/apache_beam/transforms/core.py  |  24 +++-
 sdks/python/apache_beam/transforms/display.py   |  72 +++---
 .../apache_beam/transforms/display_test.py  | 138 ++-
 .../apache_beam/transforms/ptransform_test.py   |  58 
 14 files changed, 548 insertions(+), 70 deletions(-)
--




[1/2] incubator-beam git commit: Adding display data to sink, sources, and parallel-do operations.

2016-11-09 Thread robertwb
ache_beam/runners/dataflow/native_io/iobase.py
--
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py 
b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
index 9621f4c..32da3a2 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase.py
@@ -22,6 +22,7 @@ import logging
 
 from apache_beam import pvalue
 from apache_beam.transforms import ptransform
+from apache_beam.transforms.display import HasDisplayData
 
 
 def _dict_printable_fields(dict_object, skip_fields):
@@ -38,7 +39,7 @@ _minor_fields = ['coder', 'key_coder', 'value_coder',
  'compression_type']
 
 
-class NativeSource(object):
+class NativeSource(HasDisplayData):
   """A source implemented by Dataflow service.
 
   This class is to be only inherited by sources natively implemented by Cloud
@@ -244,7 +245,7 @@ class DynamicSplitResultWithPosition(DynamicSplitResult):
 self.stop_position = stop_position
 
 
-class NativeSink(object):
+class NativeSink(HasDisplayData):
   """A sink implemented by Dataflow service.
 
   This class is to be only inherited by sinks natively implemented by Cloud

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/core.py
--
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 3b5816a..3189de7 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -29,7 +29,7 @@ from apache_beam.coders import typecoders
 from apache_beam.internal import util
 from apache_beam.transforms import ptransform
 from apache_beam.transforms import window
-from apache_beam.transforms.display import HasDisplayData
+from apache_beam.transforms.display import HasDisplayData, DisplayDataItem
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.ptransform import PTransformWithSideInputs
 from apache_beam.transforms.window import MIN_TIMESTAMP
@@ -235,6 +235,16 @@ class CallableWrapperDoFn(DoFn):
 
 super(CallableWrapperDoFn, self).__init__()
 
+  def display_data(self):
+# If the callable has a name, then it's likely a function, and
+# we show its name.
+# Otherwise, it might be an instance of a callable class. We
+# show its class.
+display_data_value = (self._fn.__name__ if hasattr(self._fn, '__name__')
+  else self._fn.__class__)
+return {'fn': DisplayDataItem(display_data_value,
+  label='Transform Function')}
+
   def __repr__(self):
 return 'CallableWrapperDoFn(%s)' % self._fn
 
@@ -580,6 +590,11 @@ class ParDo(PTransformWithSideInputs):
   def process_argspec_fn(self):
 return self.fn.process_argspec_fn()
 
+  def display_data(self):
+return {'fn': DisplayDataItem(self.fn.__class__,
+  label='Transform Function'),
+'fn_dd': self.fn}
+
   def apply(self, pcoll):
 self.side_output_tags = set()
 # TODO(robertwb): Change all uses of the dofn attribute to use fn instead.
@@ -696,6 +711,10 @@ def Map(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
   else:
 wrapper = lambda x: [fn(x)]
 
+  # TODO. What about callable classes?
+  if hasattr(fn, '__name__'):
+wrapper.__name__ = fn.__name__
+
   # Proxy the type-hint information from the original function to this new
   # wrapped function.
   get_type_hints(wrapper).input_types = get_type_hints(fn).input_types
@@ -739,6 +758,9 @@ def Filter(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
 % (fn, 'first' if label is None else 'second'))
   wrapper = lambda x, *args, **kwargs: [x] if fn(x, *args, **kwargs) else []
 
+  # TODO: What about callable classes?
+  if hasattr(fn, '__name__'):
+wrapper.__name__ = fn.__name__
   # Proxy the type-hint information from the function being wrapped, setting 
the
   # output type to be the same as the input type.
   get_type_hints(wrapper).input_types = get_type_hints(fn).input_types

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3c1043ae/sdks/python/apache_beam/transforms/display.py
--
diff --git a/sdks/python/apache_beam/transforms/display.py 
b/sdks/python/apache_beam/transforms/display.py
index e93d560..365abaf 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -93,6 +93,8 @@ class DisplayData(object):
 continue
 
   if isinstance(element, DisplayDataItem):
+if element.should_drop():
+  continue
 element.key = key
 element.namespace = self.namespace
 self.items.append(element)
@@ -132,6 +134,

[GitHub] incubator-beam pull request #1323: Testing pr 1212

2016-11-09 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/1323

Testing pr 1212



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam github-pr-1212

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1323.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1323


commit 51dc6d70bad0b537738cbee3338a57dbc58fb1bd
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-11-09T19:49:43Z

Closes #1321




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Add hamcrest dependency.

2016-11-09 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk aa603872c -> cf026bb55


Add hamcrest dependency.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6bc59fd1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6bc59fd1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6bc59fd1

Branch: refs/heads/python-sdk
Commit: 6bc59fd12b5e2911f501aefdd8083c6bf65d001b
Parents: aa60387
Author: Robert Bradshaw 
Authored: Wed Nov 9 10:51:58 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Nov 9 10:51:58 2016 -0800

--
 sdks/python/setup.py | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6bc59fd1/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 1aa3eb3..4010b06 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -97,6 +97,10 @@ REQUIRED_PACKAGES = [
 ]
 
 
+REQUIRED_TEST_PACKAGES = [
+'pyhamcrest>=1.9,<2.0',
+]
+
 setuptools.setup(
 name=PACKAGE_NAME,
 version=PACKAGE_VERSION,
@@ -119,6 +123,7 @@ setuptools.setup(
 setup_requires=['nose>=1.0'],
 install_requires=REQUIRED_PACKAGES,
 test_suite='nose.collector',
+tests_require=REQUIRED_TEST_PACKAGES,
 zip_safe=False,
 # PyPI package information.
 classifiers=[



[2/2] incubator-beam git commit: Closes #1321

2016-11-09 Thread robertwb
Closes #1321


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cf026bb5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cf026bb5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cf026bb5

Branch: refs/heads/python-sdk
Commit: cf026bb55362f2faa843374bcdd67b7e020b8f87
Parents: aa60387 6bc59fd
Author: Robert Bradshaw 
Authored: Wed Nov 9 11:49:43 2016 -0800
Committer: Robert Bradshaw 
Committed: Wed Nov 9 11:49:43 2016 -0800

--
 sdks/python/setup.py | 5 +
 1 file changed, 5 insertions(+)
--




[GitHub] incubator-beam pull request #1321: Add hamcrest dependency.

2016-11-09 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/1321

Add hamcrest dependency.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam hamcrest

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/1321.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #1321


commit 6bc59fd12b5e2911f501aefdd8083c6bf65d001b
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-11-09T18:51:58Z

Add hamcrest dependency.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[4/5] incubator-beam git commit: Renames InprocessPipelineRunner to DirectPipelineRunner and removes the existing DirectPipelineRunner. Renamed the folder to direct to keep all related files in the sa

2016-11-07 Thread robertwb
# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""DirectPipelineRunner, executing on the local machine.
-
-The DirectPipelineRunner class implements what is called in Dataflow
-parlance the "direct runner". Such a runner executes the entire graph
-of transformations belonging to a pipeline on the local machine.
-"""
-
-from __future__ import absolute_import
-
-import collections
-import itertools
-import logging
-
-from apache_beam import coders
-from apache_beam import error
-from apache_beam.runners.common import DoFnRunner
-from apache_beam.runners.common import DoFnState
-from apache_beam.runners.runner import PipelineResult
-from apache_beam.runners.runner import PipelineRunner
-from apache_beam.runners.runner import PipelineState
-from apache_beam.runners.runner import PValueCache
-from apache_beam.transforms import sideinputs
-from apache_beam.transforms.window import GlobalWindows
-from apache_beam.transforms.window import WindowedValue
-from apache_beam.typehints.typecheck import OutputCheckWrapperDoFn
-from apache_beam.typehints.typecheck import TypeCheckError
-from apache_beam.typehints.typecheck import TypeCheckWrapperDoFn
-from apache_beam.utils import counters
-from apache_beam.utils.options import TypeOptions
-
-
-class DirectPipelineRunner(PipelineRunner):
-  """A local pipeline runner.
-
-  The runner computes everything locally and does not make any attempt to
-  optimize for time or space.
-  """
-
-  def __init__(self, cache=None):
-# Cache of values computed while the runner executes a pipeline.
-self._cache = cache if cache is not None else PValueCache()
-self._counter_factory = counters.CounterFactory()
-# Element counts used for debugging footprint issues in the direct runner.
-# The values computed are used only for logging and do not take part in
-# any decision making logic. The key for the counter dictionary is either
-# the full label for the transform producing the elements or a tuple
-# (full label, output tag) for ParDo transforms since they can output 
values
-# on multiple outputs.
-self.debug_counters = {}
-self.debug_counters['element_counts'] = collections.Counter()
-
-  @property
-  def cache(self):
-return self._cache
-
-  def get_pvalue(self, pvalue):
-"""Gets the PValue's computed value from the runner's cache."""
-try:
-  return self._cache.get_pvalue(pvalue)
-except KeyError:
-  raise error.PValueError('PValue is not computed.')
-
-  def clear_pvalue(self, pvalue):
-"""Removes a PValue from the runner's cache."""
-self._cache.clear_pvalue(pvalue)
-
-  def skip_if_cached(func):  # pylint: disable=no-self-argument
-"""Decorator to skip execution of a transform if value is cached."""
-
-def func_wrapper(self, pvalue, *args, **kwargs):
-  logging.debug('Current: Debug counters: %s', self.debug_counters)
-  if self._cache.is_cached(pvalue):  # pylint: disable=protected-access
-return
-  else:
-func(self, pvalue, *args, **kwargs)
-return func_wrapper
-
-  def run(self, pipeline):
-super(DirectPipelineRunner, self).run(pipeline)
-logging.info('Final: Debug counters: %s', self.debug_counters)
-return DirectPipelineResult(state=PipelineState.DONE,
-counter_factory=self._counter_factory)
-
-  @skip_if_cached
-  def run_CreatePCollectionView(self, transform_node):
-transform = transform_node.transform
-view = transform.view
-values = self._cache.get_pvalue(transform_node.inputs[0])
-    result = sideinputs.SideInputMap(type(view), view._view_options(), values)
-self._cache.cache_output(transform_node, result)
-
-  @skip_if_cached
-  def run_ParDo(self, transform_node):
-transform = transform_node.transform
-
-side_inputs = [self._cache.get_pvalue(view)
-   for view in transform_node.side_inputs]
-
-# TODO(robertwb): Do this type checking inside DoFnRunner to get it on
-# remote workers as well?
-options = transform_node.inputs[0].pipeline.options
-if options is not None and options.view_as(TypeOptions).runtime_type_check:
-  transform.dofn = TypeCheckWrapperDoFn(
-  transform.dofn, transform.get_type_hints())
-
-# TODO(robertwb): Should this be conditionally done on the workers as well?
-transform.dofn = OutputChec

[2/5] incubator-beam git commit: Add a test to check memory consumption of the direct runner.

2016-11-07 Thread robertwb
Add a test to check memory consumption of the direct runner.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d021e9ce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d021e9ce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d021e9ce

Branch: refs/heads/python-sdk
Commit: d021e9ce7bccb770e187c720bd7a95bb99f1bc8e
Parents: 7f201cb
Author: Ahmet Altay 
Authored: Thu Nov 3 15:37:49 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Nov 7 17:56:49 2016 -0800

--
 sdks/python/apache_beam/pipeline_test.py | 37 +++
 1 file changed, 37 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/d021e9ce/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index db3ad9e..a4c983f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -180,6 +180,43 @@ class PipelineTest(unittest.TestCase):
 ['a-x', 'b-x', 'c-x'],
 sorted(['a', 'b', 'c'] | 'AddSuffix' >> AddSuffix('-x')))
 
+  def test_memory_usage(self):
+try:
+  import resource
+except ImportError:
+  # Skip the test if resource module is not available (e.g. non-Unix os).
+  self.skipTest('resource module not available.')
+
+def get_memory_usage_in_bytes():
+  return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss * (2 ** 10)
+
+def check_memory(value, memory_threshold):
+  memory_usage = get_memory_usage_in_bytes()
+  if memory_usage > memory_threshold:
+raise RuntimeError(
+'High memory usage: %d > %d' % (memory_usage, memory_threshold))
+  return value
+
+len_elements = 100
+num_elements = 10
+num_maps = 100
+
+pipeline = Pipeline('DirectPipelineRunner')
+
+# Consumed memory should not be proportional to the number of maps.
+memory_threshold = (
+get_memory_usage_in_bytes() + (3 * len_elements * num_elements))
+
+biglist = pipeline | 'oom:create' >> Create(
+['x' * len_elements] * num_elements)
+for i in range(num_maps):
+  biglist = biglist | ('oom:addone-%d' % i) >> Map(lambda x: x + 'y')
+result = biglist | 'oom:check' >> Map(check_memory, memory_threshold)
+assert_that(result, equal_to(
+['x' * len_elements + 'y' * num_maps] * num_elements))
+
+pipeline.run()
+
   def test_pipeline_as_context(self):
 def raise_exception(exn):
   raise exn



  1   2   3   4   5   >