incubator-beam-site git commit: Remove duplicate paragraph.

2016-05-27 Thread robertwb
Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site 87d9a07fe -> edd0705b8


Remove duplicate paragraph.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/edd0705b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/edd0705b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/edd0705b

Branch: refs/heads/asf-site
Commit: edd0705b82799a8e000c995da9c099023f2ccaa8
Parents: 87d9a07
Author: Robert Bradshaw 
Authored: Fri May 27 11:18:51 2016 -0700
Committer: Robert Bradshaw 
Committed: Fri May 27 11:22:21 2016 -0700

--
 _posts/2016-05-20-where-is-my-pcollection-dot-map.md | 2 --
 content/blog/2016/05/27/where-is-my-pcollection-dot-map.html | 2 --
 2 files changed, 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/edd0705b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
--
diff --git a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md 
b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
index f7ea468..e9232d3 100644
--- a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
+++ b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
@@ -15,8 +15,6 @@ Though Beam is relatively new, its design draws heavily on 
many years of experie
 
 The original FlumeJava API has methods like `count` and `parallelDo` on the 
PCollections. Though slightly more succinct, this approach has many 
disadvantages to extensibility. Every new user to FlumeJava wanted to add 
transforms, and adding them as methods to PCollection simply doesn't scale 
well. In contrast, a PCollection in Beam has a single `apply` method which 
takes any PTransform as an argument.
 
-Have you ever wondered why Beam has PTransforms for everything instead of 
having methods on PCollection? Take a look at the history that led to this (and 
other) design decisions.
-
 
   
 FlumeJava

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/edd0705b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
--
diff --git a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html 
b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
index 9d69ac3..2af12f5 100644
--- a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
+++ b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
@@ -112,8 +112,6 @@
 
 The original FlumeJava API has methods like count and parallelDo on the PCollections. Though 
slightly more succinct, this approach has many disadvantages to extensibility. 
Every new user to FlumeJava wanted to add transforms, and adding them as 
methods to PCollection simply doesn’t scale well. In contrast, a PCollection 
in Beam has a single apply method which 
takes any PTransform as an argument.
 
-Have you ever wondered why Beam has PTransforms for everything instead of 
having methods on PCollection? Take a look at the history that led to this (and 
other) design decisions.
-
 
   
 FlumeJava



incubator-beam-site git commit: Fix blog post typo.

2016-05-27 Thread robertwb
Repository: incubator-beam-site
Updated Branches:
  refs/heads/asf-site edd0705b8 -> 0c5c6471b


Fix blog post typo.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/0c5c6471
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/0c5c6471
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/0c5c6471

Branch: refs/heads/asf-site
Commit: 0c5c6471b8c6e632d3270e132f1514560319a92d
Parents: edd0705
Author: Robert Bradshaw 
Authored: Fri May 27 13:56:58 2016 -0700
Committer: Robert Bradshaw 
Committed: Fri May 27 13:56:58 2016 -0700

--
 _posts/2016-05-20-where-is-my-pcollection-dot-map.md | 2 +-
 content/blog/2016/05/27/where-is-my-pcollection-dot-map.html | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/0c5c6471/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
--
diff --git a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md 
b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
index e9232d3..5fc13f0 100644
--- a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
+++ b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md
@@ -82,7 +82,7 @@ As pipelines grow and evolve, it is useful to structure your 
pipeline into modul
 
 
 
-Three different visualizations of a simple WordCount pipeline which 
computes the number of occurrences of every word in a set of text files. The 
flag view gives the full DAG of all operations performed. The execution view 
groups operations according to how they're executed, e.g. after performing 
runner-specific optimizations like function composition. The structured view 
nests operations according to their grouping in PTransforms.
+Three different visualizations of a simple WordCount pipeline which 
computes the number of occurrences of every word in a set of text files. The 
flat view gives the full DAG of all operations performed. The execution view 
groups operations according to how they're executed, e.g. after performing 
runner-specific optimizations like function composition. The structured view 
nests operations according to their grouping in PTransforms.
 
 
 ## Summary

http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/0c5c6471/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
--
diff --git a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html 
b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
index 2af12f5..9f50244 100644
--- a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
+++ b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html
@@ -179,7 +179,7 @@ PCollectionO output = input
 
 
 
-Three different visualizations of a simple WordCount pipeline which 
computes the number of occurrences of every word in a set of text files. The 
flag view gives the full DAG of all operations performed. The execution view 
groups operations according to how they're executed, e.g. after performing 
runner-specific optimizations like function composition. The structured view 
nests operations according to their grouping in PTransforms.
+Three different visualizations of a simple WordCount pipeline which 
computes the number of occurrences of every word in a set of text files. The 
flat view gives the full DAG of all operations performed. The execution view 
groups operations according to how they're executed, e.g. after performing 
runner-specific optimizations like function composition. The structured view 
nests operations according to their grouping in PTransforms.
 
 
 Summary



[GitHub] incubator-beam pull request: Optimize the Count CombineFn

2016-03-29 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/88

Optimize the Count CombineFn

Previously the accumulator was stored as a Long.  This uses
a singleton long[] to avoid the boxing and unboxing on every
increment.

This required changing the Coder (the format actually remains
the same, but we have no way of declaring that) so is not
backwards compatible with reload.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam count

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/88.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #88


commit f2bb6e86bfd71f5cda4f574c6bedfaeff172291a
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-03-29T19:28:13Z

Optimize the Count CombineFn

Previously the accumulator was stored as a Long.  This uses
a singleton long[] to avoid the boxing and unboxing on every
increment.

This required changing the Coder (the format actually remains
the same, but we have no way of declaring that) so is not
backwards compatible with reload.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/3] incubator-beam git commit: Update docstring.

2016-07-25 Thread robertwb
Update docstring.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76f3864b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76f3864b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76f3864b

Branch: refs/heads/python-sdk
Commit: 76f3864b61ad9e2ac9a2e81e7a0d5993db5fde6c
Parents: 8a7bc71
Author: Charles Chen 
Authored: Mon Jul 25 18:36:45 2016 -0700
Committer: Charles Chen 
Committed: Mon Jul 25 18:36:45 2016 -0700

--
 sdks/python/apache_beam/utils/dependency.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76f3864b/sdks/python/apache_beam/utils/dependency.py
--
diff --git a/sdks/python/apache_beam/utils/dependency.py 
b/sdks/python/apache_beam/utils/dependency.py
index d8e0036..7d1ae41 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -1,3 +1,4 @@
+
 #
 # Licensed to the Apache Software Foundation (ASF) under one or more
 # contributor license agreements.  See the NOTICE file distributed with
@@ -439,7 +440,7 @@ def get_required_container_version():
 
 
 def get_sdk_name_and_version():
-  """Returns the name and version of the SDK reported to Cloud Dataflow."""
+  """Returns name and version of SDK reported to Google Cloud Dataflow."""
   # TODO(ccy): Make this check cleaner.
   container_version = get_required_container_version()
   if container_version == 'beamhead':



[3/3] incubator-beam git commit: Closes #730

2016-07-25 Thread robertwb
Closes #730


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26ff6579
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26ff6579
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26ff6579

Branch: refs/heads/python-sdk
Commit: 26ff6579531d598e91b220fa21ec9bd7f9220f78
Parents: 153916f 76f3864
Author: Robert Bradshaw 
Authored: Mon Jul 25 21:11:31 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 25 21:11:31 2016 -0700

--
 sdks/python/apache_beam/internal/apiclient.py |  6 +++---
 sdks/python/apache_beam/utils/dependency.py   | 12 
 2 files changed, 15 insertions(+), 3 deletions(-)
--




[1/3] incubator-beam git commit: Fix SDK name and version sent to the Cloud Dataflow service

2016-07-25 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 153916fe9 -> 26ff65795


Fix SDK name and version sent to the Cloud Dataflow service


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8a7bc71d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8a7bc71d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8a7bc71d

Branch: refs/heads/python-sdk
Commit: 8a7bc71dee1e3f9f17c29e5e558870a6bc5f4880
Parents: 38d9dea
Author: Charles Chen 
Authored: Mon Jul 25 18:22:08 2016 -0700
Committer: Charles Chen 
Committed: Mon Jul 25 18:22:08 2016 -0700

--
 sdks/python/apache_beam/internal/apiclient.py |  6 +++---
 sdks/python/apache_beam/utils/dependency.py   | 11 +++
 2 files changed, 14 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a7bc71d/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index 363b8e1..137a40b 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -29,7 +29,6 @@ from apitools.base.py import encoding
 from apitools.base.py import exceptions
 
 from apache_beam import utils
-from apache_beam import version
 from apache_beam.internal import pickler
 from apache_beam.internal.auth import get_service_credentials
 from apache_beam.internal.json_value import to_json_value
@@ -39,6 +38,7 @@ from apache_beam.utils import dependency
 from apache_beam.utils import names
 from apache_beam.utils import retry
 from apache_beam.utils.dependency import get_required_container_version
+from apache_beam.utils.dependency import get_sdk_name_and_version
 from apache_beam.utils.names import PropertyNames
 from apache_beam.utils.options import GoogleCloudOptions
 from apache_beam.utils.options import StandardOptions
@@ -191,12 +191,12 @@ class Environment(object):
 self.proto.userAgent = dataflow.Environment.UserAgentValue()
 self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint
 
-version_string = version.__version__
+sdk_name, version_string = get_sdk_name_and_version()
 
 self.proto.userAgent.additionalProperties.extend([
 dataflow.Environment.UserAgentValue.AdditionalProperty(
 key='name',
-value=to_json_value('Google Cloud Dataflow SDK for Python')),
+value=to_json_value(sdk_name)),
 dataflow.Environment.UserAgentValue.AdditionalProperty(
 key='version', value=to_json_value(version_string))])
 # Version information.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a7bc71d/sdks/python/apache_beam/utils/dependency.py
--
diff --git a/sdks/python/apache_beam/utils/dependency.py 
b/sdks/python/apache_beam/utils/dependency.py
index b809cf2..d8e0036 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -59,6 +59,7 @@ import tempfile
 
 
 from apache_beam import utils
+from apache_beam import version as beam_version
 from apache_beam.internal import pickler
 from apache_beam.utils import names
 from apache_beam.utils import processes
@@ -437,6 +438,16 @@ def get_required_container_version():
 return 'beamhead'
 
 
+def get_sdk_name_and_version():
+  """Returns the name and version of the SDK reported to Cloud Dataflow."""
+  # TODO(ccy): Make this check cleaner.
+  container_version = get_required_container_version()
+  if container_version == 'beamhead':
+return ('Apache Beam SDK for Python', beam_version.__version__)
+  else:
+return ('Google Cloud Dataflow SDK for Python', container_version)
+
+
 def _download_pypi_sdk_package(temp_dir):
   """Downloads SDK package from PyPI and returns path to local path."""
   # TODO(silviuc): Handle apache-beam versions when we have official releases.



[1/2] incubator-beam git commit: Made checksum_output optional in bigshuffle.py.

2016-07-12 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 95a591e05 -> 67a769a9a


Made checksum_output optional in bigshuffle.py.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b782ded
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b782ded
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b782ded

Branch: refs/heads/python-sdk
Commit: 2b782deddefc4ddf437cc4623ccb461332f0fe20
Parents: 95a591e
Author: Marian Dvorsky 
Authored: Mon Jul 11 11:47:01 2016 -0700
Committer: Robert Bradshaw 
Committed: Tue Jul 12 14:26:26 2016 -0700

--
 .../apache_beam/examples/cookbook/bigshuffle.py | 35 ++--
 1 file changed, 18 insertions(+), 17 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b782ded/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py 
b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index 8cbaa40..692bd52 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -42,7 +42,6 @@ def run(argv=None):
   required=True,
   help='Output file pattern to write results to.')
   parser.add_argument('--checksum_output',
-  required=True,
   help='Checksum output file pattern.')
   known_args, pipeline_args = parser.parse_known_args(argv)
 
@@ -59,24 +58,26 @@ def run(argv=None):
 'format',
 lambda (key, vals): ['%s%s' % (key, val) for val in vals]))
 
-  input_csum = (lines
-| beam.Map('input-csum', crc32line)
-| beam.CombineGlobally('combine-input-csum', sum)
-| beam.Map('hex-format', lambda x: '%x' % x))
-  input_csum | beam.io.Write(
-  'write-input-csum',
-  beam.io.TextFileSink(known_args.checksum_output + '-input'))
-
   # Write the output using a "Write" transform that has side effects.
   output | beam.io.Write('write', beam.io.TextFileSink(known_args.output))
-  # Write the output checksum
-  output_csum = (output
- | beam.Map('output-csum', crc32line)
- | beam.CombineGlobally('combine-output-csum', sum)
- | beam.Map('hex-format-output', lambda x: '%x' % x))
-  output_csum | beam.io.Write(
-  'write-output-csum',
-  beam.io.TextFileSink(known_args.checksum_output + '-output'))
+
+  # Optionally write the input and output checksums.
+  if known_args.checksum_output:
+input_csum = (lines
+  | beam.Map('input-csum', crc32line)
+  | beam.CombineGlobally('combine-input-csum', sum)
+  | beam.Map('hex-format', lambda x: '%x' % x))
+input_csum | beam.io.Write(
+'write-input-csum',
+beam.io.TextFileSink(known_args.checksum_output + '-input'))
+
+output_csum = (output
+   | beam.Map('output-csum', crc32line)
+   | beam.CombineGlobally('combine-output-csum', sum)
+   | beam.Map('hex-format-output', lambda x: '%x' % x))
+output_csum | beam.io.Write(
+'write-output-csum',
+beam.io.TextFileSink(known_args.checksum_output + '-output'))
 
   # Actually run the pipeline (all operations above are deferred).
   p.run()



[2/2] incubator-beam git commit: Closes #625

2016-07-12 Thread robertwb
Closes #625


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/67a769a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/67a769a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/67a769a9

Branch: refs/heads/python-sdk
Commit: 67a769a9a526397a44e01eda16e1941189dc654c
Parents: 95a591e 2b782de
Author: Robert Bradshaw 
Authored: Tue Jul 12 14:26:27 2016 -0700
Committer: Robert Bradshaw 
Committed: Tue Jul 12 14:26:27 2016 -0700

--
 .../apache_beam/examples/cookbook/bigshuffle.py | 35 ++--
 1 file changed, 18 insertions(+), 17 deletions(-)
--




[2/2] incubator-beam git commit: Closes #641

2016-07-14 Thread robertwb
Closes #641


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e8c39c79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e8c39c79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e8c39c79

Branch: refs/heads/python-sdk
Commit: e8c39c798e00444d5b1f6e53c3743fc41ac57b99
Parents: 67a769a f858ea9
Author: Robert Bradshaw 
Authored: Thu Jul 14 10:17:10 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:17:10 2016 -0700

--
 sdks/python/apache_beam/examples/cookbook/bigshuffle.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)
--




[1/2] incubator-beam git commit: Add type hints to bigshuffle to avoid pickle overhead.

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 67a769a9a -> e8c39c798


Add type hints to bigshuffle to avoid pickle overhead.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f858ea93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f858ea93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f858ea93

Branch: refs/heads/python-sdk
Commit: f858ea9335b38c67778f47de63e1d1d16dc79fee
Parents: 67a769a
Author: Robert Bradshaw 
Authored: Tue Jul 12 13:05:46 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:17:09 2016 -0700

--
 sdks/python/apache_beam/examples/cookbook/bigshuffle.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f858ea93/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py 
b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
index 692bd52..0b5da02 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py
@@ -48,11 +48,14 @@ def run(argv=None):
   p = beam.Pipeline(argv=pipeline_args)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input))
+  lines = p | beam.io.Read(
+  'read', beam.io.TextFileSource(known_args.input,
+ coder=beam.coders.BytesCoder()))
 
   # Count the occurrences of each word.
   output = (lines
-| beam.Map('split', lambda x: (x[:10], x[10:99]))
+| beam.Map('split', lambda x: (x[:10], x[10:99])
+  ).with_output_types(beam.typehints.KV[str, str])
 | beam.GroupByKey('group')
 | beam.FlatMap(
 'format',



[2/2] incubator-beam git commit: Fix typo in Dataflow runner monitoring message

2016-07-14 Thread robertwb
Fix typo in Dataflow runner monitoring message


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ffbaccaa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ffbaccaa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ffbaccaa

Branch: refs/heads/python-sdk
Commit: ffbaccaa823149ba76e95e1996f9c7a2ba606d3b
Parents: e8c39c7
Author: Charles Chen 
Authored: Tue Jul 12 22:00:22 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:18:05 2016 -0700

--
 sdks/python/apache_beam/internal/apiclient.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ffbaccaa/sdks/python/apache_beam/internal/apiclient.py
--
diff --git a/sdks/python/apache_beam/internal/apiclient.py 
b/sdks/python/apache_beam/internal/apiclient.py
index 99c7090..363b8e1 100644
--- a/sdks/python/apache_beam/internal/apiclient.py
+++ b/sdks/python/apache_beam/internal/apiclient.py
@@ -466,7 +466,7 @@ class DataflowApplicationClient(object):
 # The response is a Job proto with the id for the new job.
 logging.info('Created job with id: [%s]', response.id)
 logging.info(
-'To accesss the Dataflow monitoring console, please navigate to '
+'To access the Dataflow monitoring console, please navigate to '
 'https://console.developers.google.com/project/%s/dataflow/job/%s',
 self.google_cloud_options.project, response.id)
 



[2/2] incubator-beam git commit: Closes #652

2016-07-14 Thread robertwb
Closes #652


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b9d81fc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b9d81fc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b9d81fc

Branch: refs/heads/python-sdk
Commit: 2b9d81fcc87ebd3bfa484c034fe46f79b63beef2
Parents: 52c0d89 cd0178b
Author: Robert Bradshaw 
Authored: Thu Jul 14 10:19:06 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:19:06 2016 -0700

--
 sdks/python/setup.py | 1 +
 1 file changed, 1 insertion(+)
--




[1/2] incubator-beam git commit: Added cy_combiners.py to the list of Cythonized Python SDK files.

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 52c0d893e -> 2b9d81fcc


Added cy_combiners.py to the list of Cythonized Python SDK files.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd0178b1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd0178b1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd0178b1

Branch: refs/heads/python-sdk
Commit: cd0178b1a2918a15005bdd485b0a554b1e3e6378
Parents: 52c0d89
Author: Marian Dvorsky 
Authored: Wed Jul 13 17:16:58 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:19:05 2016 -0700

--
 sdks/python/setup.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0178b1/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index 789d976..2287b8e 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -75,6 +75,7 @@ setuptools.setup(
 '**/*.pyx',
 'apache_beam/coders/coder_impl.py',
 'apache_beam/runners/common.py',
+'apache_beam/transforms/cy_combiners.py',
 'apache_beam/utils/counters.py',
 ]),
 install_requires=REQUIRED_PACKAGES,



[GitHub] incubator-beam pull request #641: Add type hints to bigshuffle to avoid pick...

2016-07-14 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/641


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Closes #644

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e8c39c798 -> 52c0d893e


Closes #644


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52c0d893
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52c0d893
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52c0d893

Branch: refs/heads/python-sdk
Commit: 52c0d893e73d8dae05acc5f5a6f16840ea79404f
Parents: e8c39c7 ffbacca
Author: Robert Bradshaw 
Authored: Thu Jul 14 10:18:05 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:18:05 2016 -0700

--
 sdks/python/apache_beam/internal/apiclient.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Temporarily reverting pickler changes (@4e2d8ab).

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 2b9d81fcc -> 7c0c27ac0


Temporarily reverting pickler changes (@4e2d8ab).

It is causing infinite recursion when some tests are directly
invoked outside the testing framework.

Added one such test to tox tests for testing failing case
(directly by main function).


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/245facdb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/245facdb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/245facdb

Branch: refs/heads/python-sdk
Commit: 245facdbc166298cdb96521165a7514653fa8e57
Parents: 2b9d81f
Author: Ahmet Altay 
Authored: Wed Jul 13 10:18:03 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:20:29 2016 -0700

--
 sdks/python/apache_beam/internal/pickler.py | 18 --
 sdks/python/tox.ini |  1 +
 2 files changed, 1 insertion(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/245facdb/sdks/python/apache_beam/internal/pickler.py
--
diff --git a/sdks/python/apache_beam/internal/pickler.py 
b/sdks/python/apache_beam/internal/pickler.py
index f427aa5..898e04b 100644
--- a/sdks/python/apache_beam/internal/pickler.py
+++ b/sdks/python/apache_beam/internal/pickler.py
@@ -159,24 +159,6 @@ if 'save_module' in dir(dill.dill):
   return old_save_module_dict(pickler, obj)
   dill.dill.save_module_dict = new_save_module_dict
 
-
-  old_save_function = dill.dill.save_function
-
-  @dill.dill.register(types.FunctionType)
-  def new_save_function(pickler, obj):
-globs = obj.__globals__ if dill.dill.PY3 else obj.func_globals
-if (dill.dill.is_dill(pickler) and globs == pickler._main.__dict__
-and not pickler._recurse):
-  try:
-pickler._recurse = True
-return old_save_function(pickler, obj)
-  finally:
-pickler._recurse = False
-else:
-  return old_save_function(pickler, obj)
-  dill.dill.save_function = new_save_function
-
-
   def _nest_dill_logging():
 """Prefix all dill logging with its depth in the callstack.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/245facdb/sdks/python/tox.ini
--
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index 29674ed..5a2572e 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -31,6 +31,7 @@ deps=
   pep8
   pylint
 commands =
+  python apache_beam/examples/complete/autocomplete_test.py
   python setup.py test
   {toxinidir}/run_pylint.sh
 passenv = TRAVIS*



[2/2] incubator-beam git commit: Closes #645

2016-07-14 Thread robertwb
Closes #645


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c0c27ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c0c27ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c0c27ac

Branch: refs/heads/python-sdk
Commit: 7c0c27ac00fd0ab6b8c2b982540e644485efc60e
Parents: 2b9d81f 245facd
Author: Robert Bradshaw 
Authored: Thu Jul 14 10:20:59 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 10:20:59 2016 -0700

--
 sdks/python/apache_beam/internal/pickler.py | 18 --
 sdks/python/tox.ini |  1 +
 2 files changed, 1 insertion(+), 18 deletions(-)
--




[1/2] incubator-beam git commit: DoOutputsTuple cleanup

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 7c0c27ac0 -> 762a2930a


DoOutputsTuple cleanup

This avoids the same (logical) PCollection from being re-created
with a different producer.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2eb3b52
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2eb3b52
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2eb3b52

Branch: refs/heads/python-sdk
Commit: e2eb3b520ce742bbf4aef63046ed2dfdb96c32e5
Parents: 7c0c27a
Author: Robert Bradshaw 
Authored: Thu Jul 14 11:03:41 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 11:03:41 2016 -0700

--
 sdks/python/apache_beam/pvalue.py | 28 
 1 file changed, 16 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2eb3b52/sdks/python/apache_beam/pvalue.py
--
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 8552a45..6fc3041 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -58,15 +58,15 @@ class PValue(object):
 self.producer = None
 
   def __str__(self):
-return '<%s>' % self._str_internal()
+return self._str_internal()
 
   def __repr__(self):
 return '<%s at %s>' % (self._str_internal(), hex(id(self)))
 
   def _str_internal(self):
-return '%s transform=%s' % (
-self.__class__.__name__,
-self.producer.transform if self.producer else 'n/a')
+return "%s[%s.%s]" % (self.__class__.__name__,
+  self.producer.full_label if self.producer else None,
+  self.tag)
 
   def apply(self, *args, **kwargs):
 """Applies a transform or callable to a PValue.
@@ -191,16 +191,20 @@ class DoOutputsTuple(object):
 # Check if we accessed this tag before.
 if tag in self._pcolls:
   return self._pcolls[tag]
+
 if tag is not None:
   self._transform.side_output_tags.add(tag)
-pcoll = PCollection(self._pipeline, tag=tag)
-# Transfer the producer from the DoOutputsTuple to the resulting
-# PCollection.
-pcoll.producer = self.producer
-# Add this as an output to both the inner ParDo and the outer _MultiParDo
-# PTransforms.
-self.producer.parts[0].add_output(pcoll, tag)
-self.producer.add_output(pcoll, tag)
+  pcoll = PCollection(self._pipeline, tag=tag)
+  # Transfer the producer from the DoOutputsTuple to the resulting
+  # PCollection.
+  pcoll.producer = self.producer
+  # Add this as an output to both the inner ParDo and the outer _MultiParDo
+  # PTransforms.
+  self.producer.parts[0].add_output(pcoll, tag)
+  self.producer.add_output(pcoll, tag)
+else:
+  # Main output is output of inner ParDo.
+  pcoll = self.producer.parts[0].outputs[0]
 self._pcolls[tag] = pcoll
 return pcoll
 



[2/2] incubator-beam git commit: Closes #658

2016-07-14 Thread robertwb
Closes #658


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/762a2930
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/762a2930
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/762a2930

Branch: refs/heads/python-sdk
Commit: 762a2930a093c19580d9f631b923c126aceccc02
Parents: 7c0c27a e2eb3b5
Author: Robert Bradshaw 
Authored: Thu Jul 14 11:43:40 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 11:43:40 2016 -0700

--
 sdks/python/apache_beam/pvalue.py | 28 
 1 file changed, 16 insertions(+), 12 deletions(-)
--




[GitHub] incubator-beam pull request #658: DoOutputsTuple cleanup

2016-07-14 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/658

DoOutputsTuple cleanup

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

This avoids the same (logical) PCollection from being re-created
with a different producer.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam runner-pvalues

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/658.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #658


commit e2eb3b520ce742bbf4aef63046ed2dfdb96c32e5
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-14T18:03:41Z

DoOutputsTuple cleanup

This avoids the same (logical) PCollection from being re-created
with a different producer.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #657: Accept runners by fully qualified name.

2016-07-14 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/657

Accept runners by fully qualified name.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam runner

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/657.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #657


commit 683262903817e012efbc649d8fe73532876b7c5b
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-14T17:53:09Z

Accept runners by fully qualified name.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/3] incubator-beam git commit: Closes #657

2016-07-14 Thread robertwb
Closes #657


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b695068
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b695068
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b695068

Branch: refs/heads/python-sdk
Commit: 3b69506897db7f21cc09eaa5ac8c08e4ee15ad2c
Parents: 762a293 c055e84
Author: Robert Bradshaw 
Authored: Thu Jul 14 14:16:09 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 14:16:09 2016 -0700

--
 .../apache_beam/runners/dataflow_runner.py  |  4 ++
 sdks/python/apache_beam/runners/runner.py   | 39 ++--
 sdks/python/apache_beam/runners/runner_test.py  |  2 +-
 3 files changed, 24 insertions(+), 21 deletions(-)
--




[1/3] incubator-beam git commit: Accept runners by fully qualified name.

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 762a2930a -> 3b6950689


Accept runners by fully qualified name.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fe8954
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fe8954
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fe8954

Branch: refs/heads/python-sdk
Commit: 84fe8954669ef9a30448d140b5a578c14b863819
Parents: 762a293
Author: Robert Bradshaw 
Authored: Thu Jul 14 10:53:09 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 14:15:59 2016 -0700

--
 sdks/python/apache_beam/runners/runner.py  | 28 ++---
 sdks/python/apache_beam/runners/runner_test.py |  2 +-
 2 files changed, 14 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner.py
--
diff --git a/sdks/python/apache_beam/runners/runner.py 
b/sdks/python/apache_beam/runners/runner.py
index 55b63f3..98f9758 100644
--- a/sdks/python/apache_beam/runners/runner.py
+++ b/sdks/python/apache_beam/runners/runner.py
@@ -41,26 +41,24 @@ def create_runner(runner_name):
 RuntimeError: if an invalid runner name is used.
   """
   # pylint: disable=wrong-import-order, wrong-import-position
-  if runner_name == 'DirectPipelineRunner':
-import apache_beam.runners.direct_runner
-return apache_beam.runners.direct_runner.DirectPipelineRunner()
-  if runner_name == 'DiskCachedPipelineRunner':
-import apache_beam.runners.direct_runner
-return apache_beam.runners.direct_runner.DiskCachedPipelineRunner(
-)
-  if runner_name == 'EagerPipelineRunner':
-import apache_beam.runners.direct_runner
-return apache_beam.runners.direct_runner.EagerPipelineRunner()
-  elif runner_name in ('DataflowPipelineRunner',
-   'BlockingDataflowPipelineRunner'):
+  if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner',
+ 'EagerPipelineRunner'):
+runner_name = 'apache_beam.runners.direct_runner.' + runner_name
+
+  if runner_name in ('DataflowPipelineRunner',
+ 'BlockingDataflowPipelineRunner'):
 import apache_beam.runners.dataflow_runner
 return apache_beam.runners.dataflow_runner.DataflowPipelineRunner(
 blocking=runner_name == 'BlockingDataflowPipelineRunner')
+  elif '.' in runner_name:
+module, runner = runner_name.rsplit('.', 1)
+return getattr(__import__(module, {}, {}, [runner], -1), runner)()
   else:
-raise RuntimeError(
+raise ValueError(
 'Unexpected pipeline runner: %s. Valid values are '
-'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or 
'
-'BlockingDataflowPipelineRunner.' % runner_name)
+'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, '
+'BlockingDataflowPipelineRunner or the fully qualified name of '
+'a PipelineRunner subclass.' % runner_name)
 
 
 class PipelineRunner(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner_test.py
--
diff --git a/sdks/python/apache_beam/runners/runner_test.py 
b/sdks/python/apache_beam/runners/runner_test.py
index 20a7259..d2e70d7 100644
--- a/sdks/python/apache_beam/runners/runner_test.py
+++ b/sdks/python/apache_beam/runners/runner_test.py
@@ -44,7 +44,7 @@ class RunnerTest(unittest.TestCase):
 self.assertTrue(
 isinstance(create_runner('BlockingDataflowPipelineRunner'),
DataflowPipelineRunner))
-self.assertRaises(RuntimeError, create_runner, 'xyz')
+self.assertRaises(ValueError, create_runner, 'xyz')
 
   def test_remote_runner_translation(self):
 remote_runner = DataflowPipelineRunner()



[1/2] incubator-beam git commit: Closes #653

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 3b6950689 -> c8cef2cba


Closes #653


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8cef2cb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8cef2cb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8cef2cb

Branch: refs/heads/python-sdk
Commit: c8cef2cbac10b2a3be79b7087c96e254ce7aedd3
Parents: 3b69506 afc68bc
Author: Robert Bradshaw 
Authored: Thu Jul 14 14:18:43 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 14:18:43 2016 -0700

--
 sdks/python/apache_beam/coders/coders_test_common.py   | 4 ++--
 .../apache_beam/examples/complete/top_wikipedia_sessions.py| 6 +++---
 sdks/python/apache_beam/transforms/timeutil.py | 5 ++---
 3 files changed, 7 insertions(+), 8 deletions(-)
--




[2/2] incubator-beam git commit: Fix min and max timestamp on 32-bit machines

2016-07-14 Thread robertwb
Fix min and max timestamp on 32-bit machines


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/afc68bc3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/afc68bc3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/afc68bc3

Branch: refs/heads/python-sdk
Commit: afc68bc3aee9cafbe865628a9683bffdd73853ec
Parents: 3b69506
Author: Charles Chen 
Authored: Thu Jul 14 00:39:01 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 14 14:18:43 2016 -0700

--
 sdks/python/apache_beam/coders/coders_test_common.py   | 4 ++--
 .../apache_beam/examples/complete/top_wikipedia_sessions.py| 6 +++---
 sdks/python/apache_beam/transforms/timeutil.py | 5 ++---
 3 files changed, 7 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/afc68bc3/sdks/python/apache_beam/coders/coders_test_common.py
--
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 07436cb..0266fdc 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -19,7 +19,6 @@
 
 import logging
 import math
-import sys
 import unittest
 
 import dill
@@ -121,9 +120,10 @@ class CodersTest(unittest.TestCase):
 # Multi-byte encoding starts at 128
 self.check_coder(coders.VarIntCoder(), *range(120, 140))
 # Large values
+MAX_64_BIT_INT = 0x7fff
 self.check_coder(coders.VarIntCoder(),
  *[int(math.pow(-1, k) * math.exp(k))
-   for k in range(0, int(math.log(sys.maxint)))])
+   for k in range(0, int(math.log(MAX_64_BIT_INT)))])
 
   def test_float_coder(self):
 self.check_coder(coders.FloatCoder(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/afc68bc3/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
--
diff --git 
a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py 
b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 55b7857..7337910 100644
--- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
+++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
@@ -42,7 +42,6 @@ from __future__ import absolute_import
 import argparse
 import json
 import logging
-import sys
 
 import apache_beam as beam
 from apache_beam import combiners
@@ -50,6 +49,7 @@ from apache_beam import window
 
 ONE_HOUR_IN_SECONDS = 3600
 THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS
+MAX_TIMESTAMP = 0x7fff
 
 
 class ExtractUserAndTimestampDoFn(beam.DoFn):
@@ -128,8 +128,8 @@ class ComputeTopSessions(beam.PTransform):
 return (pcoll
 | beam.ParDo('ExtractUserAndTimestamp',
  ExtractUserAndTimestampDoFn())
-| beam.Filter(
-lambda x: abs(hash(x)) <= sys.maxint * self.sampling_threshold)
+| beam.Filter(lambda x: (abs(hash(x)) <=
+ MAX_TIMESTAMP * self.sampling_threshold))
 | ComputeSessions()
 | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn())
 | TopPerMonth()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/afc68bc3/sdks/python/apache_beam/transforms/timeutil.py
--
diff --git a/sdks/python/apache_beam/transforms/timeutil.py 
b/sdks/python/apache_beam/transforms/timeutil.py
index f72a9e4..4092b60 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -23,7 +23,6 @@ from abc import ABCMeta
 from abc import abstractmethod
 
 import datetime
-import sys
 
 
 class Timestamp(object):
@@ -115,8 +114,8 @@ class Timestamp(object):
 return Duration(micros=self.micros % other.micros)
 
 
-MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1)
-MAX_TIMESTAMP = Timestamp(micros=sys.maxint)
+MIN_TIMESTAMP = Timestamp(micros=-0x7fff - 1)
+MAX_TIMESTAMP = Timestamp(micros=0x7fff)
 
 
 class Duration(object):



[1/2] incubator-beam git commit: Update some of the example tests to use assert_that

2016-07-14 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk c8cef2cba -> a1a51c3c1


Update some of the example tests to use assert_that


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fef464
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fef464
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fef464

Branch: refs/heads/python-sdk
Commit: 84fef464b0abea41e318c0fe983ac43874e5f6ad
Parents: c8cef2c
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jul 13 16:28:46 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Thu Jul 14 17:37:02 2016 -0700

--
 .../examples/complete/autocomplete_test.py  | 34 ++---
 .../examples/complete/estimate_pi.py| 19 +++--
 .../examples/complete/estimate_pi_test.py   | 36 +-
 .../examples/cookbook/coders_test.py| 33 +++--
 .../examples/cookbook/custom_ptransform.py  | 74 ++--
 .../examples/cookbook/custom_ptransform_test.py | 41 ---
 .../examples/cookbook/group_with_coder_test.py  | 70 --
 sdks/python/apache_beam/transforms/util.py  | 13 
 8 files changed, 139 insertions(+), 181 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/autocomplete_test.py
--
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py 
b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index bd0a6cb..1b3ee5f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -15,43 +15,17 @@
 # limitations under the License.
 #
 
-"""Test for the wordcount example."""
+"""Test for the autocomplete example."""
 
-import collections
 import unittest
 
-
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
-from apache_beam.pvalue import AsIter
-
-# TODO(robertwb): Move to testing utilities.
-
-
-def assert_that(pcoll, matcher):
-  """Asserts that the give PCollection satisfies the constraints of the matcher
-  in a way that is runnable locally or on a remote service.
-  """
-  singleton = pcoll.pipeline | beam.Create('create_singleton', [None])
-
-  def check_matcher(_, side_value):
-assert matcher(side_value)
-return []
-  singleton | beam.FlatMap(check_matcher, AsIter(pcoll))  # pylint: 
disable=expression-not-assigned
-
-
-def contains_in_any_order(expected):
-  def matcher(value):
-vs = collections.Counter(value)
-es = collections.Counter(expected)
-if vs != es:
-  raise ValueError(
-  'extra: %s, missing: %s' % (vs - es, es - vs))
-return True
-  return matcher
+from apache_beam.transforms.util import assert_that
+from apache_beam.transforms.util import contains_in_any_order
 
 
-class WordCountTest(unittest.TestCase):
+class AutocompleteTest(unittest.TestCase):
 
   WORDS = ['this', 'this', 'that', 'to', 'to', 'to']
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/estimate_pi.py
--
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 8b0f202..3c4a2d9 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -85,6 +85,20 @@ class JsonCoder(object):
 return json.dumps(x)
 
 
+class EstimatePiTransform(beam.PTransform):
+  """Runs 10M trials, and combine the results to estimate pi."""
+
+  def __init__(self, label):
+super(EstimatePiTransform, self).__init__(label)
+
+  def apply(self, pcoll):
+# A hundred work items of a hundred thousand tries each.
+return (pcoll
+| beam.Create('Initialize', [10] * 100).with_output_types(int)
+| beam.Map('Run trials', run_trials)
+| beam.CombineGlobally('Sum', combine_results).without_defaults())
+
+
 def run(argv=None):
 
   parser = argparse.ArgumentParser()
@@ -94,11 +108,8 @@ def run(argv=None):
   known_args, pipeline_args = parser.parse_known_args(argv)
 
   p = beam.Pipeline(argv=pipeline_args)
-  # A thousand work items of a million tries each.
   (p  # pylint: disable=expression-not-assigned
-   | beam.Create('Initialize', [10] * 100).with_output_types(int)
-   | beam.Map('Run trials', run_trials)
-   | beam.CombineGlobally('Sum', combine_results).without_d

[GitHub] incubator-beam pull request #706: Simple DoFnRunner optimizatons.

2016-07-21 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/706

Simple DoFnRunner optimizatons.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam no-logging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/706.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #706


commit f9da1cde8d4f7828f0d39d04e84e4bdda20093e2
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-21T19:08:33Z

Remove expensive per-element-step logging context.

This is 3-4% of the total runtime for something that is rarely
used.  We don't do this in Java either.

commit c415c229e22012574be56c53e2df2c46d5c37e08
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-21T19:11:03Z

Cache dofn.proces method.

Saves another couple percent.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[3/4] incubator-beam git commit: Allow passing logging context directly.

2016-07-25 Thread robertwb
Allow passing logging context directly.

This is better than passing a logger module with a specific class.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de35f282
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de35f282
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de35f282

Branch: refs/heads/python-sdk
Commit: de35f28294dfb4f47b429635eff843b38279e240
Parents: d20cf64
Author: Robert Bradshaw 
Authored: Sat Jul 23 00:54:11 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 25 11:57:42 2016 -0700

--
 sdks/python/apache_beam/runners/common.py | 10 --
 1 file changed, 8 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de35f282/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 9c9ab22..c017704 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -62,7 +62,9 @@ class DoFnRunner(Receiver):
context,
tagged_receivers,
logger=None,
-   step_name=None):
+   step_name=None,
+   # Preferred alternative to logger
+   logging_context=None):
 if not args and not kwargs:
   self.dofn = fn
   self.dofn_process = fn.process
@@ -85,9 +87,13 @@ class DoFnRunner(Receiver):
 self.window_fn = windowing.windowfn
 self.context = context
 self.tagged_receivers = tagged_receivers
-self.logging_context = get_logging_context(logger, step_name=step_name)
 self.step_name = step_name
 
+if logging_context:
+  self.logging_context = logging_context
+else:
+  self.logging_context = get_logging_context(logger, step_name=step_name)
+
 # Optimize for the common case.
 self.main_receivers = as_receiver(tagged_receivers[None])
 



[4/4] incubator-beam git commit: Closes #721

2016-07-25 Thread robertwb
Closes #721


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/153916fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/153916fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/153916fe

Branch: refs/heads/python-sdk
Commit: 153916fe9f641f5fe5e8d473caef67f449ee6bca
Parents: 38d9dea de35f28
Author: Robert Bradshaw 
Authored: Mon Jul 25 12:02:02 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 25 12:02:02 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd | 30 ---
 sdks/python/apache_beam/runners/common.py  | 72 ++---
 2 files changed, 86 insertions(+), 16 deletions(-)
--




[GitHub] incubator-beam pull request #721: Make DoFnRunner a Receiver.

2016-07-25 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/721


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #741: Top improvements

2016-07-27 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/741

Top improvements

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam top

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/741.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #741


commit 9151fcf861704c550c21f411a9047f955f179ec4
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2016-07-27T17:09:49Z

Better top implementation.

When selecting the top k of n, it is common that k << n.
Using a heap is O(n log k) while select algorithms can
achieve O(n + k log k).

This also avoids the ugliness that heapq does not take the
comparator as an argument, resulting in _HeapItem classes that
were cumbersome and expensive to serialize.

commit e38d6f60f18bf6ad61f2d387c67efa2012af2e14
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2016-07-27T19:23:02Z

Allow Top operations to take key argument rather than compare, and order
by the natural ordering if neither is specified.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #726

2016-07-28 Thread robertwb
Closes #726


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53ab635c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53ab635c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53ab635c

Branch: refs/heads/python-sdk
Commit: 53ab635c75e4b94b3930a601911a717ddc499efe
Parents: 26ff657 2f4054b
Author: Robert Bradshaw 
Authored: Thu Jul 28 11:05:56 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 28 11:05:56 2016 -0700

--
 sdks/python/apache_beam/io/fileio.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--




[1/2] incubator-beam git commit: Make TextFileReader observable

2016-07-28 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 26ff65795 -> 53ab635c7


Make TextFileReader observable

This allows future implementation of size tracking for elements in side input 
sources.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4054ba
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4054ba
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4054ba

Branch: refs/heads/python-sdk
Commit: 2f4054ba37da1c1100f45a572d96e7a6e2e60152
Parents: 26ff657
Author: Charles Chen 
Authored: Mon Jul 25 11:44:22 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 28 11:05:55 2016 -0700

--
 sdks/python/apache_beam/io/fileio.py | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f4054ba/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 3afaae8..b1e091b 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -745,10 +745,12 @@ class NativeTextFileSink(iobase.NativeSink):
 # TextFileReader, TextMultiFileReader.
 
 
-class TextFileReader(iobase.NativeSourceReader):
+class TextFileReader(iobase.NativeSourceReader,
+ coders.observable.ObservableMixin):
   """A reader for a text file source."""
 
   def __init__(self, source):
+super(TextFileReader, self).__init__()
 self.source = source
 self.start_offset = self.source.start_offset or 0
 self.end_offset = self.source.end_offset
@@ -778,6 +780,7 @@ class TextFileReader(iobase.NativeSourceReader):
   self._file.seek(self.start_offset - 1)
   self.current_offset -= 1
   line = self._file.readline()
+  self.notify_observers(line, is_encoded=True)
   self.current_offset += len(line)
 else:
   self._file.seek(self.start_offset)
@@ -801,6 +804,7 @@ class TextFileReader(iobase.NativeSourceReader):
 # a dynamic split request from the service.
 return
   line = self._file.readline()
+  self.notify_observers(line, is_encoded=True)
   self.current_offset += len(line)
   if self.source.strip_trailing_newlines:
 line = line.rstrip('\n')



[3/3] incubator-beam git commit: Closes #741

2016-07-28 Thread robertwb
Closes #741


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4716d9d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4716d9d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4716d9d

Branch: refs/heads/python-sdk
Commit: b4716d9dcf9c3f198fc72e181adc0bae8b6fa818
Parents: 53ab635 866a09d
Author: Robert Bradshaw 
Authored: Thu Jul 28 11:08:06 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 28 11:08:06 2016 -0700

--
 sdks/python/apache_beam/transforms/combiners.py | 201 ---
 .../apache_beam/transforms/combiners_test.py|  42 +++-
 2 files changed, 164 insertions(+), 79 deletions(-)
--




[1/3] incubator-beam git commit: Better top implementation.

2016-07-28 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 53ab635c7 -> b4716d9dc


Better top implementation.

When selecting the top k of n, it is common that k << n.
Using a heap is O(n log k) while select algorithms can
achieve O(n + k log k).

This also avoids the ugliness that heapq does not take the
comparator as an argument, resulting in _HeapItem classes that
were cumbersome and expensive to serialize.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/adb3ed93
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/adb3ed93
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/adb3ed93

Branch: refs/heads/python-sdk
Commit: adb3ed93053c83b4e28e7baa879e9aee82f02785
Parents: 53ab635
Author: Robert Bradshaw <rober...@gmail.com>
Authored: Wed Jul 27 10:09:49 2016 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Jul 28 11:08:05 2016 -0700

--
 sdks/python/apache_beam/transforms/combiners.py | 111 +--
 1 file changed, 51 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adb3ed93/sdks/python/apache_beam/transforms/combiners.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 8c56e5a..453c0f8 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -228,76 +228,67 @@ class TopCombineFn(core.CombineFn):
   apply call become additional arguments to the comparator.
   """
 
-  # Actually pickling the comparison operators (including, often, their
-  # entire globals) can be very expensive.  Instead refer to them by index
-  # in this dictionary, which is populated on construction (including
-  # unpickling).
-  compare_by_id = {}
-
-  def __init__(self, n, compare, _compare_id=None):  # pylint: 
disable=invalid-name
+  # TODO(robertwb): Allow taking a key rather than a compare.
+  def __init__(self, n, compare):
 self._n = n
+self._buffer_size = min(2 * n, n + 1000)
 self._compare = compare
-self._compare_id = _compare_id or id(compare)
-TopCombineFn.compare_by_id[self._compare_id] = self._compare
-
-  def __reduce_ex__(self, _):
-return TopCombineFn, (self._n, self._compare, self._compare_id)
 
-  class _HeapItem(object):
-"""A wrapper for values supporting arbitrary comparisons.
-
-The heap implementation supplied by Python is a min heap that always uses
-the __lt__ operator if one is available. This wrapper overloads __lt__,
-letting us specify arbitrary precedence for elements in the PCollection.
-"""
+  def create_accumulator(self, *args, **kwargs):
+return None, []
 
-def __init__(self, item, compare_id, *args, **kwargs):
-  # item: wrapped item.
-  # compare:  an implementation of the pairwise < operator.
-  # args, kwargs: extra arguments supplied to the compare function.
-  self.item = item
-  self.compare_id = compare_id
-  self.args = args
-  self.kwargs = kwargs
+  def add_input(self, accumulator, element, *args, **kwargs):
+if args or kwargs:
+  lt = lambda a, b: self._compare(a, b, *args, **kwargs)
+else:
+  lt = self._compare
 
-def __lt__(self, other):
-  return TopCombineFn.compare_by_id[self.compare_id](
-  self.item, other.item, *self.args, **self.kwargs)
+threshold, buffer = accumulator
+if len(buffer) < self._n:
+  if not buffer:
+return element, [element]
+  else:
+buffer.append(element)
+if lt(element, threshold):  # element < threshold
+  return element, buffer
+else:
+  return accumulator  # with mutated buffer
+elif lt(threshold, element):  # threshold < element
+  buffer.append(element)
+  if len(buffer) < self._buffer_size:
+return accumulator
+  else:
+buffer.sort(cmp=lambda a, b: (not lt(a, b)) - (not lt(b, a)))
+return buffer[-self._n], buffer[-self._n:]
+else:
+  return accumulator
 
-  def create_accumulator(self, *args, **kwargs):
-return []  # Empty heap.
-
-  def add_input(self, heap, element, *args, **kwargs):
-# Note that because heap is a min heap, heappushpop will discard incoming
-# elements that are lesser (according to compare) than those in the heap
-# (since that's what you would get if you pushed a small element on and
-# popped the smallest element off). So, filtering a collection with a
-# min-heap gives you the largest elements in the collection.
-item = self._HeapItem(element, self._compare_i

[1/2] incubator-beam git commit: Optimize Map and Flatmap when there are no side inputs.

2016-07-28 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk b4716d9dc -> 351c3831d


Optimize Map and Flatmap when there are no side inputs.

varargs and kwargs are expensive, even when they're empty.

This is especially true for otherwise one-argument Python calls
which are special cased in CPython.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d2fb1f8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d2fb1f8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d2fb1f8

Branch: refs/heads/python-sdk
Commit: 7d2fb1f88d1a2370dd4053f3a1738cbb9838cc2f
Parents: b4716d9
Author: Robert Bradshaw <rober...@google.com>
Authored: Wed Jul 27 18:29:59 2016 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Thu Jul 28 11:09:48 2016 -0700

--
 sdks/python/apache_beam/transforms/core.py | 40 ++---
 1 file changed, 30 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d2fb1f8/sdks/python/apache_beam/transforms/core.py
--
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 5e6aafc..38b9cd2 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -20,6 +20,8 @@
 from __future__ import absolute_import
 
 import copy
+import inspect
+import types
 
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -194,6 +196,16 @@ class DoFn(WithTypeHints):
   return type_hint
 
 
+def _fn_takes_side_inputs(fn):
+  try:
+argspec = inspect.getargspec(fn)
+  except TypeError:
+# We can't tell; maybe it does.
+return True
+  is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None
+  return len(argspec.args) > 1 + is_bound or argspec.varargs or 
argspec.keywords
+
+
 class CallableWrapperDoFn(DoFn):
   """A DoFn (function) object wrapping a callable object.
 
@@ -214,6 +226,11 @@ class CallableWrapperDoFn(DoFn):
   raise TypeError('Expected a callable object instead of: %r' % fn)
 
 self._fn = fn
+if _fn_takes_side_inputs(fn):
+  self.process = lambda context, *args, **kwargs: fn(
+  context.element, *args, **kwargs)
+else:
+  self.process = lambda context: fn(context.element)
 
 super(CallableWrapperDoFn, self).__init__()
 
@@ -237,9 +254,6 @@ class CallableWrapperDoFn(DoFn):
 return self._strip_output_annotations(
 trivial_inference.infer_return_type(self._fn, [input_type]))
 
-  def process(self, context, *args, **kwargs):
-return self._fn(context.element, *args, **kwargs)
-
   def process_argspec_fn(self):
 return getattr(self._fn, '_argspec_fn', self._fn)
 
@@ -676,7 +690,10 @@ def Map(fn_or_label, *args, **kwargs):  # pylint: 
disable=invalid-name
 'Map can be used only with callable objects. '
 'Received %r instead for %s argument.'
 % (fn, 'first' if label is None else 'second'))
-  wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+  if _fn_takes_side_inputs(fn):
+wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)]
+  else:
+wrapper = lambda x: [fn(x)]
 
   # Proxy the type-hint information from the original function to this new
   # wrapped function.
@@ -1008,21 +1025,24 @@ class GroupByKey(PTransform):
   value_type = windowed_value_iter_type.inner_type.inner_type
   return Iterable[KV[key_type, Iterable[value_type]]]
 
-def process(self, context):
-  k, vs = context.element
+def start_bundle(self, context):
   # pylint: disable=wrong-import-order, wrong-import-position
   from apache_beam.transforms.trigger import InMemoryUnmergedState
   from apache_beam.transforms.trigger import create_trigger_driver
   # pylint: enable=wrong-import-order, wrong-import-position
-  driver = create_trigger_driver(self.windowing, True)
-  state = InMemoryUnmergedState()
+  self.driver = create_trigger_driver(self.windowing, True)
+  self.state_type = InMemoryUnmergedState
+
+def process(self, context):
+  k, vs = context.element
+  state = self.state_type()
   # TODO(robertwb): Conditionally process in smaller chunks.
-  for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP):
+  for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP):
 yield wvalue.with_value((k, wvalue.value))
   while state.timers:
 fired = state.get_and_clear_timers()
 for timer_window, (name, time_domain, fire_time) in fired:
-  for wvalue in driver.process_timer(
+  for wvalue in self.driver.process_timer(
   timer_window, na

[GitHub] incubator-beam pull request #748: Optimize Map and Flatmap when there are no...

2016-07-28 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/748


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #741: Top improvements

2016-07-28 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/741


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #748

2016-07-28 Thread robertwb
Closes #748


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/351c3831
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/351c3831
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/351c3831

Branch: refs/heads/python-sdk
Commit: 351c3831de1bdcfcb19b2f24f9f0b6a19e77e421
Parents: b4716d9 7d2fb1f
Author: Robert Bradshaw 
Authored: Thu Jul 28 11:09:49 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 28 11:09:49 2016 -0700

--
 sdks/python/apache_beam/transforms/core.py | 40 ++---
 1 file changed, 30 insertions(+), 10 deletions(-)
--




[04/12] incubator-beam git commit: Fix multi-input named PTransforms.

2016-07-23 Thread robertwb
Fix multi-input named PTransforms.

Now delegate the __ror__ logic entirely for the naming wrapper.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937cf69e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937cf69e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937cf69e

Branch: refs/heads/python-sdk
Commit: 937cf69e958d4a82fb274f311de248930298db69
Parents: 9fe102a
Author: Robert Bradshaw 
Authored: Fri Jul 22 14:32:33 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:45 2016 -0700

--
 sdks/python/apache_beam/transforms/ptransform.py | 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937cf69e/sdks/python/apache_beam/transforms/ptransform.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index da8b671..b652bca 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -400,7 +400,7 @@ class PTransform(WithTypeHints):
 else:
   return NotImplemented
 
-  def __ror__(self, left):
+  def __ror__(self, left, label=None):
 """Used to apply this PTransform to non-PValues, e.g., a tuple."""
 pvalueish, pvalues = self._extract_input_pvalues(left)
 pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)]
@@ -434,7 +434,7 @@ class PTransform(WithTypeHints):
 if not isinstance(v, pvalue.PValue) and v is not None}
 pvalueish = _SetInputPValues().visit(pvalueish, replacements)
 self.pipeline = p
-result = p.apply(self, pvalueish)
+result = p.apply(self, pvalueish, label)
 if deferred:
   return result
 else:
@@ -720,5 +720,8 @@ class _NamedPTransform(PTransform):
 super(_NamedPTransform, self).__init__(label)
 self.transform = transform
 
+  def __ror__(self, pvalueish):
+return self.transform.__ror__(pvalueish, self.label)
+
   def apply(self, pvalue):
 raise RuntimeError("Should never be applied directly.")



[11/12] incubator-beam git commit: Final cleanup pass.

2016-07-23 Thread robertwb
Final cleanup pass.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c078fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c078fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c078fe

Branch: refs/heads/python-sdk
Commit: e3c078fe28553b7e7317316b6df51b4c570573ba
Parents: c5b5b14
Author: Robert Bradshaw 
Authored: Sat Jul 23 01:32:23 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:46 2016 -0700

--
 .../examples/complete/autocomplete.py|  4 ++--
 .../examples/complete/autocomplete_test.py   |  4 ++--
 .../apache_beam/examples/complete/estimate_pi.py |  5 ++---
 .../examples/complete/top_wikipedia_sessions.py  |  4 ++--
 .../complete/top_wikipedia_sessions_test.py  |  2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py  | 12 +---
 .../examples/cookbook/custom_ptransform.py   |  9 +
 .../examples/cookbook/custom_ptransform_test.py  |  2 +-
 .../apache_beam/examples/cookbook/filters.py |  4 ++--
 .../examples/cookbook/group_with_coder.py|  9 +
 .../examples/cookbook/multiple_output_pardo.py   | 14 +++---
 .../apache_beam/examples/snippets/snippets.py| 19 +--
 .../apache_beam/examples/streaming_wordcap.py|  4 ++--
 13 files changed, 45 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete.py
--
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py 
b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 10d9009..c3cd88f 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -48,8 +48,8 @@ def run(argv=None):
| 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input))
| 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'TopPerPrefix' >> TopPerPrefix(5)
-   | beam.Map('format',
-  lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
+   | 'format' >> beam.Map(
+   lambda (prefix, candidates): '%s: %s' % (prefix, candidates))
| 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output)))
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete_test.py
--
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py 
b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 18d0511..0d20482 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase):
 
   def test_top_prefixes(self):
 p = beam.Pipeline('DirectPipelineRunner')
-words = p | 'create' >> beam.Create(self.WORDS)
-result = words | 'test' >> autocomplete.TopPerPrefix(5)
+words = p | beam.Create(self.WORDS)
+result = words | autocomplete.TopPerPrefix(5)
 # values must be hashable for now
 result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
 assert_that(result, equal_to(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/estimate_pi.py
--
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index c33db1d..37c1aad 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -112,9 +112,8 @@ def run(argv=None):
   p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
-   | 'Estimate' >> EstimatePiTransform()
-   | beam.io.Write('Write',
-   beam.io.TextFileSink(known_args.output,
+   | EstimatePiTransform()
+   | beam.io.Write(beam.io.TextFileSink(known_args.output,
 coder=JsonCoder(
 
   # Actually run the pipeline (all operations above are deferred).

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
--
diff --git 
a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py 
b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py
index 7468484..a48a383 100644
--- 

[09/12] incubator-beam git commit: Fix label-sensitive test.

2016-07-23 Thread robertwb
Fix label-sensitive test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a59a121
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a59a121
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a59a121

Branch: refs/heads/python-sdk
Commit: 2a59a121441a003bad949ae6a23d58a9cf2b3059
Parents: 2ff5630
Author: Robert Bradshaw 
Authored: Fri Jul 22 16:24:48 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:46 2016 -0700

--
 sdks/python/apache_beam/pipeline.py   |  4 
 sdks/python/apache_beam/transforms/ptransform_test.py | 14 +++---
 2 files changed, 11 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index aeed9f9..0572466 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -348,6 +348,10 @@ class AppliedPTransform(object):
 # root producer.
 self.refcounts = collections.defaultdict(int)
 
+  def __repr__(self):
+return "%s(%s, %s)" % (self.__class__.__name__, self.full_label,
+   type(self.transform).__name__)
+
   def update_input_refcounts(self):
 """Increment refcounts for all transforms providing inputs."""
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/transforms/ptransform_test.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 8121c1e..3a71ec3 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -54,28 +54,28 @@ class PTransformTest(unittest.TestCase):
 
 pa = Pipeline('DirectPipelineRunner')
 res = pa | 'a_label' >> beam.Create([1, 2])
-self.assertEqual('',
- str(res.producer.transform))
+self.assertEqual('AppliedPTransform(a_label, Create)',
+ str(res.producer))
 
 pc = Pipeline('DirectPipelineRunner')
-res = pc | 'with_inputs' >> beam.Create([1, 2])
+res = pc | beam.Create([1, 2])
 inputs_tr = res.producer.transform
 inputs_tr.inputs = ('ci',)
 self.assertEqual(
-"""""",
+"""""",
 str(inputs_tr))
 
 pd = Pipeline('DirectPipelineRunner')
-res = pd | 'with_sidei' >> beam.Create([1, 2])
+res = pd | beam.Create([1, 2])
 side_tr = res.producer.transform
 side_tr.side_inputs = (4,)
 self.assertEqual(
-'',
+'',
 str(side_tr))
 
 inputs_tr.side_inputs = ('cs',)
 self.assertEqual(
-"""""",
 str(inputs_tr))
 



[08/12] incubator-beam git commit: Fixes examples

2016-07-23 Thread robertwb
Fixes examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ff5630b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ff5630b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ff5630b

Branch: refs/heads/python-sdk
Commit: 2ff5630be06034e25d60e105bbe1a718ae06b4d6
Parents: 362f2e9
Author: Chamikara Jayalath 
Authored: Fri Jul 22 16:04:29 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:46 2016 -0700

--
 sdks/python/apache_beam/examples/complete/autocomplete.py | 4 ++--
 sdks/python/apache_beam/examples/complete/estimate_pi.py  | 3 ---
 sdks/python/apache_beam/examples/snippets/snippets.py | 8 
 3 files changed, 6 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/autocomplete.py
--
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py 
b/sdks/python/apache_beam/examples/complete/autocomplete.py
index b68bc56..10d9009 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -56,8 +56,8 @@ def run(argv=None):
 
 class TopPerPrefix(beam.PTransform):
 
-  def __init__(self, label, count):
-super(TopPerPrefix, self).__init__(label)
+  def __init__(self, count):
+super(TopPerPrefix, self).__init__()
 self._count = count
 
   def apply(self, words):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/estimate_pi.py
--
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index ef9f8cc..c33db1d 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -90,9 +90,6 @@ class JsonCoder(object):
 class EstimatePiTransform(beam.PTransform):
   """Runs 10M trials, and combine the results to estimate pi."""
 
-  def __init__(self, label):
-super(EstimatePiTransform, self).__init__(label)
-
   def apply(self, pcoll):
 # A hundred work items of a hundred thousand tries each.
 return (pcoll

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/snippets/snippets.py
--
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index c605db8..9d1df82 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames):
   # [END examples_wordcount_minimal_map]
 
   # [START examples_wordcount_minimal_write]
-  | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink())
+  | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
   # [END examples_wordcount_minimal_write]
   )
 
@@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames):
   formatted = counts | beam.ParDo(FormatAsTextFn())
   # [END examples_wordcount_wordcount_dofn]
 
-  formatted | 'gs://my-bucket/counts.txt' >> 
beam.io.Write(beam.io.TextFileSink())
+  formatted |  beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run()
 
@@ -702,8 +702,8 @@ def model_custom_source(count):
   # [START model_custom_source_new_ptransform]
   class ReadFromCountingSource(PTransform):
 
-def __init__(self, label, count, **kwargs):
-  super(ReadFromCountingSource, self).__init__(label, **kwargs)
+def __init__(self, count, **kwargs):
+  super(ReadFromCountingSource, self).__init__(**kwargs)
   self._count = count
 
 def apply(self, pcoll):



[10/12] incubator-beam git commit: Lint fixes.

2016-07-23 Thread robertwb
Lint fixes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca

Branch: refs/heads/python-sdk
Commit: b15d35ca6e585e75153e05d96403336889cc6894
Parents: 2a59a12
Author: Robert Bradshaw 
Authored: Fri Jul 22 18:35:22 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:46 2016 -0700

--
 sdks/python/apache_beam/dataflow_test.py|   7 +-
 .../complete/juliaset/juliaset/juliaset.py  |   9 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/cookbook/bigquery_side_input.py|  14 +-
 .../cookbook/bigquery_side_input_test.py|   4 +-
 .../examples/cookbook/bigquery_tornadoes.py |   6 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |   5 +-
 .../apache_beam/examples/cookbook/filters.py|   2 +-
 .../apache_beam/examples/snippets/snippets.py   |   2 +-
 .../examples/snippets/snippets_test.py  |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |   2 +-
 .../apache_beam/examples/wordcount_debugging.py |   2 +-
 .../apache_beam/examples/wordcount_minimal.py   |   2 +-
 sdks/python/apache_beam/io/bigquery.py  |   4 +-
 sdks/python/apache_beam/pipeline_test.py|   4 +-
 .../apache_beam/transforms/combiners_test.py|   4 +-
 sdks/python/apache_beam/transforms/core.py  |   7 +-
 .../apache_beam/transforms/ptransform_test.py   | 161 ++-
 sdks/python/apache_beam/transforms/util.py  |   3 +-
 .../typehints/typed_pipeline_test.py|   2 +-
 20 files changed, 127 insertions(+), 123 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
index bf66851..cc3a526 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase):
 words = pipeline | 'SomeWords' >> Create(words_list)
 prefix = 'zyx'
 suffix = pipeline | 'SomeString' >> Create(['xyz'])  # side in
-result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix,
-   suffix=AsSingleton(suffix))
+result = words | 'DecorateWordsDoFn' >> ParDo(
+SomeDoFn(), prefix, suffix=AsSingleton(suffix))
 assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list]))
 pipeline.run()
 
@@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase):
 pipeline = Pipeline('DirectPipelineRunner')
 pcol = pipeline | 'start' >> Create([1, 2])
 side = pipeline | 'side' >> Create([])  # 0 values in side input.
-result = (
-pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 
10)))
+result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))
 assert_that(result, equal_to([10, 20]))
 pipeline.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
--
diff --git 
a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py 
b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
index 56696c3..1445fbe 100644
--- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
+++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py
@@ -105,11 +105,12 @@ def run(argv=None):  # pylint: disable=missing-docstring
   # Group each coordinate triplet by its x value, then write the coordinates to
   # the output file with an x-coordinate grouping per line.
   # pylint: disable=expression-not-assigned
-  (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
-   | 'x coord' >> beam.GroupByKey() | beam.Map(
-   'format',
+  (coordinates
+   | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i)))
+   | 'x coord' >> beam.GroupByKey()
+   | 'format' >> beam.Map(
lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in 
coords))
-   | 'write' >> 
beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
+   | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output)))
   # pylint: enable=expression-not-assigned
   p.run()
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py
--
diff --git 

[03/12] incubator-beam git commit: Move names out of transform constructors.

2016-07-23 Thread robertwb
Move names out of transform constructors.

sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g'

Small number of tests will need to be fixed by hand.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce

Branch: refs/heads/python-sdk
Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f
Parents: 937cf69
Author: Robert Bradshaw 
Authored: Fri Jul 22 14:34:58 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:45 2016 -0700

--
 sdks/python/apache_beam/dataflow_test.py|  96 ++--
 .../examples/complete/autocomplete.py   |   8 +-
 .../examples/complete/autocomplete_test.py  |   4 +-
 .../examples/complete/estimate_pi.py|   8 +-
 .../examples/complete/estimate_pi_test.py   |   2 +-
 .../complete/juliaset/juliaset/juliaset.py  |   8 +-
 .../apache_beam/examples/complete/tfidf.py  |  32 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/complete/top_wikipedia_sessions.py |   8 +-
 .../complete/top_wikipedia_sessions_test.py |   2 +-
 .../examples/cookbook/bigquery_schema.py|   4 +-
 .../examples/cookbook/bigquery_side_input.py|   6 +-
 .../cookbook/bigquery_side_input_test.py|   8 +-
 .../examples/cookbook/bigquery_tornadoes.py |   6 +-
 .../cookbook/bigquery_tornadoes_test.py |   2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  18 +-
 .../apache_beam/examples/cookbook/coders.py |   2 +-
 .../examples/cookbook/coders_test.py|   4 +-
 .../examples/cookbook/custom_ptransform.py  |   6 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../apache_beam/examples/cookbook/filters.py|  10 +-
 .../examples/cookbook/filters_test.py   |   2 +-
 .../examples/cookbook/group_with_coder.py   |   6 +-
 .../examples/cookbook/mergecontacts.py  |   8 +-
 .../examples/cookbook/multiple_output_pardo.py  |  18 +-
 .../apache_beam/examples/snippets/snippets.py   |  62 +--
 .../examples/snippets/snippets_test.py  |  10 +-
 .../apache_beam/examples/streaming_wordcap.py   |   2 +-
 .../apache_beam/examples/streaming_wordcount.py |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |  14 +-
 .../apache_beam/examples/wordcount_debugging.py |  16 +-
 .../apache_beam/examples/wordcount_minimal.py   |  14 +-
 sdks/python/apache_beam/io/avroio.py|   2 +-
 sdks/python/apache_beam/io/bigquery.py  |   4 +-
 .../apache_beam/io/filebasedsource_test.py  |   4 +-
 sdks/python/apache_beam/io/iobase.py|   4 +-
 sdks/python/apache_beam/pipeline_test.py|  42 +-
 sdks/python/apache_beam/pvalue_test.py  |   6 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 sdks/python/apache_beam/runners/runner_test.py  |   6 +-
 .../apache_beam/transforms/combiners_test.py|  48 +-
 sdks/python/apache_beam/transforms/core.py  |  16 +-
 .../python/apache_beam/transforms/ptransform.py |   2 +-
 .../apache_beam/transforms/ptransform_test.py   | 498 +--
 sdks/python/apache_beam/transforms/util.py  |   8 +-
 .../apache_beam/transforms/window_test.py   |  14 +-
 .../transforms/write_ptransform_test.py |   2 +-
 .../typehints/typed_pipeline_test.py|  16 +-
 48 files changed, 537 insertions(+), 537 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
index 476f8b2..bf66851 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase):
   def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
 """A Count transform: v, ... => (v, n), ..."""
 return (pcoll
-| Map('AddCount', lambda x: (x, 1))
-| GroupByKey('GroupCounts')
-| Map('AddCounts', lambda (x, ones): (x, sum(ones
+| 'AddCount' >> Map(lambda x: (x, 1))
+| 'GroupCounts' >> GroupByKey()
+| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones
 
   def test_word_count(self):
 pipeline = Pipeline('DirectPipelineRunner')
-lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA)
+lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA)
 result = (
-(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x)))
+(lines 

[2/4] incubator-beam git commit: Make save_main_session optional

2016-07-23 Thread robertwb
Make save_main_session optional


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e36602c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e36602c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e36602c

Branch: refs/heads/python-sdk
Commit: 2e36602c0a2944129698866cc31abc3bcac6169f
Parents: af6d380
Author: Silviu Calinoiu 
Authored: Sat Jul 23 09:31:30 2016 -0700
Committer: Silviu Calinoiu 
Committed: Sat Jul 23 09:31:30 2016 -0700

--
 .../python/apache_beam/utils/dependency_test.py | 33 
 sdks/python/apache_beam/utils/options.py|  2 +-
 2 files changed, 21 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e36602c/sdks/python/apache_beam/utils/dependency_test.py
--
diff --git a/sdks/python/apache_beam/utils/dependency_test.py 
b/sdks/python/apache_beam/utils/dependency_test.py
index b7a9296..2f9a57b 100644
--- a/sdks/python/apache_beam/utils/dependency_test.py
+++ b/sdks/python/apache_beam/utils/dependency_test.py
@@ -81,10 +81,12 @@ class SetupTest(unittest.TestCase):
 [],
 dependency.stage_job_resources(options))
 
-  def test_default_resources(self):
+  def test_with_main_session(self):
 staging_dir = tempfile.mkdtemp()
 options = PipelineOptions()
+
 options.view_as(GoogleCloudOptions).staging_location = staging_dir
+options.view_as(SetupOptions).save_main_session = True
 self.update_options(options)
 
 self.assertEqual(
@@ -94,6 +96,16 @@ class SetupTest(unittest.TestCase):
 os.path.isfile(
 os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE)))
 
+  def test_default_resources(self):
+staging_dir = tempfile.mkdtemp()
+options = PipelineOptions()
+options.view_as(GoogleCloudOptions).staging_location = staging_dir
+self.update_options(options)
+
+self.assertEqual(
+[],
+dependency.stage_job_resources(options))
+
   def test_with_requirements_file(self):
 staging_dir = tempfile.mkdtemp()
 source_dir = tempfile.mkdtemp()
@@ -106,7 +118,7 @@ class SetupTest(unittest.TestCase):
 self.create_temp_file(
 os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
 self.assertEqual(
-sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
+sorted([dependency.REQUIREMENTS_FILE,
 'abc.txt', 'def.txt']),
 sorted(dependency.stage_job_resources(
 options,
@@ -145,7 +157,7 @@ class SetupTest(unittest.TestCase):
 self.create_temp_file(
 os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing')
 self.assertEqual(
-sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE,
+sorted([dependency.REQUIREMENTS_FILE,
 'abc.txt', 'def.txt']),
 sorted(dependency.stage_job_resources(
 options,
@@ -169,8 +181,7 @@ class SetupTest(unittest.TestCase):
 source_dir, 'setup.py')
 
 self.assertEqual(
-[dependency.WORKFLOW_TARBALL_FILE,
- names.PICKLED_MAIN_SESSION_FILE],
+[dependency.WORKFLOW_TARBALL_FILE],
 dependency.stage_job_resources(
 options,
 # We replace the build setup command because a realistic one would
@@ -265,8 +276,7 @@ class SetupTest(unittest.TestCase):
 options.view_as(SetupOptions).sdk_location = 'default'
 
 self.assertEqual(
-[names.PICKLED_MAIN_SESSION_FILE,
- names.DATAFLOW_SDK_TARBALL_FILE],
+[names.DATAFLOW_SDK_TARBALL_FILE],
 dependency.stage_job_resources(
 options,
 file_copy=dependency._dependency_file_copy))
@@ -286,8 +296,7 @@ class SetupTest(unittest.TestCase):
 options.view_as(SetupOptions).sdk_location = sdk_location
 
 self.assertEqual(
-[names.PICKLED_MAIN_SESSION_FILE,
- names.DATAFLOW_SDK_TARBALL_FILE],
+[names.DATAFLOW_SDK_TARBALL_FILE],
 dependency.stage_job_resources(options))
 tarball_path = os.path.join(
 staging_dir, names.DATAFLOW_SDK_TARBALL_FILE)
@@ -321,8 +330,7 @@ class SetupTest(unittest.TestCase):
 options.view_as(SetupOptions).sdk_location = sdk_location
 
 self.assertEqual(
-[names.PICKLED_MAIN_SESSION_FILE,
- names.DATAFLOW_SDK_TARBALL_FILE],
+[names.DATAFLOW_SDK_TARBALL_FILE],
 dependency.stage_job_resources(options))
 
   def test_with_extra_packages(self):
@@ -363,8 +371,7 @@ class SetupTest(unittest.TestCase):
 
 self.assertEqual(
 ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'gcs.tar.gz',
- 

[4/4] incubator-beam git commit: Closes #723

2016-07-23 Thread robertwb
Closes #723


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9fe102a5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9fe102a5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9fe102a5

Branch: refs/heads/python-sdk
Commit: 9fe102a5cf477225108bef7f24df2299c1b15563
Parents: 47042ce 3d64a8c
Author: Robert Bradshaw 
Authored: Sat Jul 23 16:42:31 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:42:31 2016 -0700

--
 .../examples/complete/autocomplete.py   |  9 +++-
 .../examples/complete/estimate_pi.py|  8 ++-
 .../apache_beam/examples/complete/tfidf.py  |  8 ++-
 .../examples/complete/top_wikipedia_sessions.py | 10 +++-
 .../examples/cookbook/bigquery_side_input.py|  8 ++-
 .../apache_beam/examples/cookbook/bigshuffle.py |  9 +++-
 .../apache_beam/examples/cookbook/coders.py |  7 +++
 .../examples/cookbook/group_with_coder.py   |  9 +++-
 .../examples/cookbook/mergecontacts.py  |  9 +++-
 .../examples/cookbook/multiple_output_pardo.py  |  9 +++-
 sdks/python/apache_beam/examples/wordcount.py   |  9 +++-
 .../apache_beam/examples/wordcount_debugging.py |  9 +++-
 .../apache_beam/examples/wordcount_minimal.py   |  9 +++-
 .../python/apache_beam/utils/dependency_test.py | 33 +++-
 sdks/python/apache_beam/utils/options.py|  2 +-
 sdks/python/setup.py| 56 +++-
 16 files changed, 144 insertions(+), 60 deletions(-)
--




[1/4] incubator-beam git commit: Refactor setup.py to separate strings/versions

2016-07-23 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 47042ce89 -> 9fe102a5c


Refactor setup.py to separate strings/versions


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af6d3804
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af6d3804
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af6d3804

Branch: refs/heads/python-sdk
Commit: af6d3804157dc445d9133ad5a8fe0717bb1f5786
Parents: 47042ce
Author: Silviu Calinoiu 
Authored: Sat Jul 23 09:29:06 2016 -0700
Committer: Silviu Calinoiu 
Committed: Sat Jul 23 09:29:06 2016 -0700

--
 sdks/python/setup.py | 56 ++-
 1 file changed, 31 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6d3804/sdks/python/setup.py
--
diff --git a/sdks/python/setup.py b/sdks/python/setup.py
index da2df87..b0a5c85 100644
--- a/sdks/python/setup.py
+++ b/sdks/python/setup.py
@@ -15,13 +15,32 @@
 # limitations under the License.
 #
 
-"""Apache Beam SDK setup configuration."""
+"""Apache Beam SDK for Python setup file."""
 
 import os
 import platform
 import setuptools
 
 
+def get_version():
+  global_names = {}
+  execfile(os.path.normpath('./apache_beam/version.py'),
+   global_names)
+  return global_names['__version__']
+
+PACKAGE_NAME = 'apache-beam-sdk'
+PACKAGE_VERSION = get_version()
+PACKAGE_DESCRIPTION = 'Apache Beam SDK for Python'
+PACKAGE_URL = 'https://beam.incubator.apache.org'
+PACKAGE_DOWNLOAD_URL = 'TBD'
+PACKAGE_AUTHOR = 'Apache Software Foundation'
+PACKAGE_EMAIL = 'd...@beam.incubator.apache.org'
+PACKAGE_KEYWORDS = 'apache beam'
+PACKAGE_LONG_DESCRIPTION = '''
+TBD
+'''
+
+
 # Currently all compiled modules are optional  (for performance only).
 if platform.system() == 'Windows':
   # Windows doesn't always provide int64_t.
@@ -34,27 +53,12 @@ else:
 cythonize = lambda *args, **kwargs: []
 
 
-def get_version():
-  global_names = {}
-  execfile(os.path.normpath('./apache_beam/version.py'),
-   global_names)
-  return global_names['__version__']
-
-
-# Configure the required packages and scripts to install.
 REQUIRED_PACKAGES = [
 'avro>=1.7.7',
 'dill>=0.2.5',
 'google-apitools>=0.5.2',
-# TODO(silviuc): Reenable api client package dependencies when we can
-# update the packages to the latest version without affecting previous
-# SDK releases.
-# 'google-apitools-bigquery-v2',
-# 'google-apitools-dataflow-v1b3>=0.4.20160217',
-# 'google-apitools-storage-v1',
 'httplib2>=0.8',
 'mock>=1.0.1',
-'nose>=1.0',
 'oauth2client>=2.0.1',
 'protorpc>=0.9.1',
 'python-gflags>=2.0',
@@ -63,14 +67,16 @@ REQUIRED_PACKAGES = [
 
 
 setuptools.setup(
-name='apache-beam-sdk',
-version=get_version(),
-description='Apache Beam SDK for Python',
-long_description='',
-url='https://beam.incubator.apache.org',
-download_url='TBD',
-author='Apache Software Foundation',
+name=PACKAGE_NAME,
+version=PACKAGE_VERSION,
+description=PACKAGE_DESCRIPTION,
+long_description=PACKAGE_LONG_DESCRIPTION,
+url=PACKAGE_URL,
+download_url=PACKAGE_DOWNLOAD_URL,
+author=PACKAGE_AUTHOR,
+author_email=PACKAGE_EMAIL,
 packages=setuptools.find_packages(),
+package_data={'apache_beam': ['**/*.pyx', '**/*.pxd']},
 ext_modules=cythonize([
 '**/*.pyx',
 'apache_beam/coders/coder_impl.py',
@@ -79,8 +85,8 @@ setuptools.setup(
 'apache_beam/utils/counters.py',
 'apache_beam/utils/windowed_value.py',
 ]),
+setup_requires=['nose>=1.0'],
 install_requires=REQUIRED_PACKAGES,
-package_data={'': ['*.pyx', '*.pxd']},
 test_suite='nose.collector',
 zip_safe=False,
 # PyPI package information.
@@ -93,5 +99,5 @@ setuptools.setup(
 'Topic :: Software Development :: Libraries :: Python Modules',
 ],
 license='Apache 2.0',
-keywords='apache beam',
+keywords=PACKAGE_KEYWORDS,
 )



[3/4] incubator-beam git commit: Refactor examples to use save_main_session

2016-07-23 Thread robertwb
Refactor examples to use save_main_session


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d64a8c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d64a8c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d64a8c5

Branch: refs/heads/python-sdk
Commit: 3d64a8c5abf3dc9547ab75765cbd3e1ef5fac268
Parents: 2e36602
Author: Silviu Calinoiu 
Authored: Sat Jul 23 09:39:59 2016 -0700
Committer: Silviu Calinoiu 
Committed: Sat Jul 23 09:39:59 2016 -0700

--
 sdks/python/apache_beam/examples/complete/autocomplete.py |  9 +++--
 sdks/python/apache_beam/examples/complete/estimate_pi.py  |  8 +++-
 sdks/python/apache_beam/examples/complete/tfidf.py|  8 +++-
 .../examples/complete/top_wikipedia_sessions.py   | 10 --
 .../apache_beam/examples/cookbook/bigquery_side_input.py  |  8 +++-
 sdks/python/apache_beam/examples/cookbook/bigshuffle.py   |  9 +++--
 sdks/python/apache_beam/examples/cookbook/coders.py   |  7 +++
 .../apache_beam/examples/cookbook/group_with_coder.py |  9 +++--
 .../python/apache_beam/examples/cookbook/mergecontacts.py |  9 +++--
 .../examples/cookbook/multiple_output_pardo.py|  9 +++--
 sdks/python/apache_beam/examples/wordcount.py |  9 +++--
 sdks/python/apache_beam/examples/wordcount_debugging.py   |  9 +++--
 sdks/python/apache_beam/examples/wordcount_minimal.py |  9 +++--
 13 files changed, 92 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/autocomplete.py
--
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py 
b/sdks/python/apache_beam/examples/complete/autocomplete.py
index 2fb2d92..0f1e96e 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete.py
@@ -24,6 +24,8 @@ import logging
 import re
 
 import apache_beam as beam
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 def run(argv=None):
@@ -36,8 +38,11 @@ def run(argv=None):
   required=True,
   help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
-
-  p = beam.Pipeline(argv=pipeline_args)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
   (p  # pylint: disable=expression-not-assigned
| beam.io.Read('read', beam.io.TextFileSource(known_args.input))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/estimate_pi.py
--
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi.py
index 3c4a2d9..09faecf 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py
@@ -36,6 +36,8 @@ import apache_beam as beam
 from apache_beam.typehints import Any
 from apache_beam.typehints import Iterable
 from apache_beam.typehints import Tuple
+from apache_beam.utils.options import PipelineOptions
+from apache_beam.utils.options import SetupOptions
 
 
 @beam.typehints.with_output_types(Tuple[int, int, int])
@@ -106,8 +108,12 @@ def run(argv=None):
   required=True,
   help='Output file to write results to.')
   known_args, pipeline_args = parser.parse_known_args(argv)
+  # We use the save_main_session option because one or more DoFn's in this
+  # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
+  pipeline_options.view_as(SetupOptions).save_main_session = True
+  p = beam.Pipeline(options=pipeline_options)
 
-  p = beam.Pipeline(argv=pipeline_args)
   (p  # pylint: disable=expression-not-assigned
| EstimatePiTransform('Estimate')
| beam.io.Write('Write',

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/tfidf.py
--
diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py 
b/sdks/python/apache_beam/examples/complete/tfidf.py
index cd85651..ef58cc0 100644
--- 

[02/12] incubator-beam git commit: Move names out of transform constructors.

2016-07-23 Thread robertwb
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/combiners_test.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index a747112..0439fe1 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -39,16 +39,16 @@ class CombineTest(unittest.TestCase):
 size = len(vals)
 
 # First for global combines.
-pcoll = pipeline | Create('start', vals)
-result_mean = pcoll | combine.Mean.Globally('mean')
-result_count = pcoll | combine.Count.Globally('count')
+pcoll = pipeline | 'start' >> Create(vals)
+result_mean = pcoll | 'mean' >> combine.Mean.Globally()
+result_count = pcoll | 'count' >> combine.Count.Globally()
 assert_that(result_mean, equal_to([mean]), label='assert:mean')
 assert_that(result_count, equal_to([size]), label='assert:size')
 
 # Again for per-key combines.
-pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals])
-result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey')
-result_key_count = pcoll | combine.Count.PerKey('count-perkey')
+pcoll = pipeline | 'start-perkey' >> Create([('a', x) for x in vals])
+result_key_mean = pcoll | 'mean-perkey' >> combine.Mean.PerKey()
+result_key_count = pcoll | 'count-perkey' >> combine.Count.PerKey()
 assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean')
 assert_that(result_key_count, equal_to([('a', size)]), label='key:size')
 pipeline.run()
@@ -66,9 +66,9 @@ class CombineTest(unittest.TestCase):
  9: 'nniiinne'}
 
 # First for global combines.
-pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-result_top = pcoll | combine.Top.Largest('top', 5)
-result_bot = pcoll | combine.Top.Smallest('bot', 4)
+pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+result_top = pcoll | 'top' >> combine.Top.Largest(5)
+result_bot = pcoll | 'bot' >> combine.Top.Smallest(4)
 result_cmp = pcoll | combine.Top.Of(
 'cmp',
 6,
@@ -81,8 +81,8 @@ class CombineTest(unittest.TestCase):
 # Again for per-key combines.
 pcoll = pipeline | Create(
 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5)
-result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4)
+result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
+result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
 result_key_cmp = pcoll | combine.Top.PerKey(
 'cmp-perkey',
 6,
@@ -99,15 +99,15 @@ class CombineTest(unittest.TestCase):
   def test_top_shorthands(self):
 pipeline = Pipeline('DirectPipelineRunner')
 
-pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
-result_top = pcoll | beam.CombineGlobally('top', combine.Largest(5))
-result_bot = pcoll | beam.CombineGlobally('bot', combine.Smallest(4))
+pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6])
+result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5))
+result_bot = pcoll | 'bot' >> beam.CombineGlobally(combine.Smallest(4))
 assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
 assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
 
 pcoll = pipeline | Create(
 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
-result_ktop = pcoll | beam.CombinePerKey('top-perkey', combine.Largest(5))
+result_ktop = pcoll | 'top-perkey' >> 
beam.CombinePerKey(combine.Largest(5))
 result_kbot = pcoll | beam.CombinePerKey(
 'bot-perkey', combine.Smallest(4))
 assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
@@ -119,7 +119,7 @@ class CombineTest(unittest.TestCase):
 # First test global samples (lots of them).
 for ix in xrange(300):
   pipeline = Pipeline('DirectPipelineRunner')
-  pcoll = pipeline | Create('start', [1, 1, 2, 2])
+  pcoll = pipeline | 'start' >> Create([1, 1, 2, 2])
   result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3)
 
   def matcher():
@@ -141,7 +141,7 @@ class CombineTest(unittest.TestCase):
 pcoll = pipeline | Create(
 'start-perkey',
 sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
-result = pcoll | combine.Sample.FixedSizePerKey('sample', 3)
+result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
 
 def matcher():
   def match(actual):
@@ -158,7 +158,7 @@ class CombineTest(unittest.TestCase):
 p = Pipeline('DirectPipelineRunner')
 result = (
 p
-| Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])

[05/12] incubator-beam git commit: fixup: failing tests expecting name

2016-07-23 Thread robertwb
fixup: failing tests expecting name


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5b5b14d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5b5b14d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5b5b14d

Branch: refs/heads/python-sdk
Commit: c5b5b14d35fc7f6b0a576f0fca19b730015e1282
Parents: b15d35c
Author: Robert Bradshaw 
Authored: Sat Jul 23 01:07:44 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:46 2016 -0700

--
 sdks/python/apache_beam/transforms/ptransform_test.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5b5b14d/sdks/python/apache_beam/transforms/ptransform_test.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py 
b/sdks/python/apache_beam/transforms/ptransform_test.py
index 992f944..b99cd26 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -931,7 +931,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 with self.assertRaises(typehints.TypeCheckError) as e:
   (self.p
| beam.Create([1, 2, 3]).with_output_types(int)
-   | beam.GroupByKeyOnly())
+   | 'F' >> beam.GroupByKeyOnly())
 
 self.assertEqual("Input type hint violation at F: "
  "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -945,7 +945,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
   (self.p
| (beam.Create(range(5))
   .with_output_types(typehints.Iterable[int]))
-   | beam.GroupByKey())
+   | 'T' >> beam.GroupByKey())
 
 self.assertEqual("Input type hint violation at T: "
  "expected Tuple[TypeVariable[K], TypeVariable[V]], "
@@ -1563,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase):
 
 with self.assertRaises(typehints.TypeCheckError) as e:
   (self.p
-   | beam.Create([1, 1, 2, 3])
+   | 'f' >> beam.Create([1, 1, 2, 3])
| 'count elems' >> combine.Count.PerElement())
 
 self.assertEqual('Pipeline type checking is enabled, however no output '



[12/12] incubator-beam git commit: Closes #718

2016-07-23 Thread robertwb
Closes #718


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38d9dea2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38d9dea2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38d9dea2

Branch: refs/heads/python-sdk
Commit: 38d9dea2e62af280e4b9c258cedee70d6bcaa8ca
Parents: 9fe102a e3c078f
Author: Robert Bradshaw 
Authored: Sat Jul 23 16:43:47 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:47 2016 -0700

--
 sdks/python/apache_beam/dataflow_test.py|  99 ++--
 .../examples/complete/autocomplete.py   |  16 +-
 .../examples/complete/autocomplete_test.py  |   4 +-
 .../examples/complete/estimate_pi.py|  14 +-
 .../examples/complete/estimate_pi_test.py   |   2 +-
 .../complete/juliaset/juliaset/juliaset.py  |  11 +-
 .../apache_beam/examples/complete/tfidf.py  |  32 +-
 .../apache_beam/examples/complete/tfidf_test.py |   2 +-
 .../examples/complete/top_wikipedia_sessions.py |   8 +-
 .../complete/top_wikipedia_sessions_test.py |   2 +-
 .../examples/cookbook/bigquery_schema.py|   4 +-
 .../examples/cookbook/bigquery_side_input.py|  18 +-
 .../cookbook/bigquery_side_input_test.py|  12 +-
 .../examples/cookbook/bigquery_tornadoes.py |  10 +-
 .../cookbook/bigquery_tornadoes_test.py |   2 +-
 .../apache_beam/examples/cookbook/bigshuffle.py |  31 +-
 .../apache_beam/examples/cookbook/coders.py |   2 +-
 .../examples/cookbook/coders_test.py|   4 +-
 .../examples/cookbook/custom_ptransform.py  |   9 +-
 .../examples/cookbook/custom_ptransform_test.py |   2 +-
 .../apache_beam/examples/cookbook/filters.py|  14 +-
 .../examples/cookbook/filters_test.py   |   2 +-
 .../examples/cookbook/group_with_coder.py   |   9 +-
 .../examples/cookbook/mergecontacts.py  |   8 +-
 .../examples/cookbook/multiple_output_pardo.py  |  30 +-
 .../apache_beam/examples/snippets/snippets.py   |  69 ++-
 .../examples/snippets/snippets_test.py  |  16 +-
 .../apache_beam/examples/streaming_wordcap.py   |   6 +-
 .../apache_beam/examples/streaming_wordcount.py |   8 +-
 sdks/python/apache_beam/examples/wordcount.py   |  16 +-
 .../apache_beam/examples/wordcount_debugging.py |  18 +-
 .../apache_beam/examples/wordcount_minimal.py   |  16 +-
 sdks/python/apache_beam/io/avroio.py|   2 +-
 sdks/python/apache_beam/io/bigquery.py  |   4 +-
 .../apache_beam/io/filebasedsource_test.py  |   4 +-
 sdks/python/apache_beam/io/iobase.py|   4 +-
 sdks/python/apache_beam/pipeline.py |  13 +
 sdks/python/apache_beam/pipeline_test.py|  48 +-
 sdks/python/apache_beam/pvalue_test.py  |   6 +-
 .../consumer_tracking_pipeline_visitor_test.py  |   4 +-
 sdks/python/apache_beam/runners/runner_test.py  |   6 +-
 .../apache_beam/transforms/combiners_test.py|  64 ++-
 sdks/python/apache_beam/transforms/core.py  |  21 +-
 .../python/apache_beam/transforms/ptransform.py |   9 +-
 .../apache_beam/transforms/ptransform_test.py   | 521 ++-
 sdks/python/apache_beam/transforms/util.py  |   9 +-
 .../apache_beam/transforms/window_test.py   |  14 +-
 .../transforms/write_ptransform_test.py |   2 +-
 .../typehints/typed_pipeline_test.py|  16 +-
 49 files changed, 628 insertions(+), 615 deletions(-)
--




[07/12] incubator-beam git commit: Cleanup and fix combiners_test.

2016-07-23 Thread robertwb
Cleanup and fix combiners_test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01830510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01830510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01830510

Branch: refs/heads/python-sdk
Commit: 01830510bded3c22ddd96937f5d83547702a4385
Parents: 7c186ce
Author: Robert Bradshaw 
Authored: Fri Jul 22 16:05:23 2016 -0700
Committer: Robert Bradshaw 
Committed: Sat Jul 23 16:43:46 2016 -0700

--
 .../apache_beam/transforms/combiners_test.py| 20 +---
 1 file changed, 9 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01830510/sdks/python/apache_beam/transforms/combiners_test.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 0439fe1..c970382 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -79,12 +79,11 @@ class CombineTest(unittest.TestCase):
 assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')
 
 # Again for per-key combines.
-pcoll = pipeline | Create(
-'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+pcoll = pipeline | 'start-perkye' >> Create(
+[('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
 result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
 result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
-result_key_cmp = pcoll | combine.Top.PerKey(
-'cmp-perkey',
+result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey(
 6,
 lambda a, b, names: len(names[a]) < len(names[b]),
 names)  # Note parameter passed to comparator.
@@ -105,11 +104,11 @@ class CombineTest(unittest.TestCase):
 assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
 assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
 
-pcoll = pipeline | Create(
-'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+pcoll = pipeline | 'start-perkey' >> Create(
+[('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
 result_ktop = pcoll | 'top-perkey' >> 
beam.CombinePerKey(combine.Largest(5))
-result_kbot = pcoll | beam.CombinePerKey(
-'bot-perkey', combine.Smallest(4))
+result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey(
+combine.Smallest(4))
 assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
 assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
 pipeline.run()
@@ -138,8 +137,7 @@ class CombineTest(unittest.TestCase):
 
 # Now test per-key samples.
 pipeline = Pipeline('DirectPipelineRunner')
-pcoll = pipeline | Create(
-'start-perkey',
+pcoll = pipeline | 'start-perkey' >> Create(
 sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
 result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
 
@@ -158,7 +156,7 @@ class CombineTest(unittest.TestCase):
 p = Pipeline('DirectPipelineRunner')
 result = (
 p
-| 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+| Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
 | beam.CombineGlobally(combine.TupleCombineFn(max,
   combine.MeanCombineFn(),
   sum)).without_defaults())



[GitHub] incubator-beam pull request #718: Change all examples and tests to use the n...

2016-07-23 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/718


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #705: Minor cdef value changes.

2016-07-21 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/705

Minor cdef value changes.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam cdef-structs-edit

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/705.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #705


commit 4ac5f8643585e9b57b8e1eaf309810efd1c0163e
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-21T17:36:36Z

Minor cdef value changes.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #721: Make DoFnRunner a Receiver.

2016-07-22 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/721

Make DoFnRunner a Receiver.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam dofn-receiver

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/721.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #721


commit 943adfd6327321f09d60ae4302aeb470aa57e558
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-23T00:33:57Z

Make DoFnRunner a Receiver.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #705: Minor cdef value changes.

2016-07-22 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/705


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #702: Add tests for WindowedValue.

2016-07-22 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/702


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Add tests for WindowedValue.

2016-07-21 Thread robertwb
Add tests for WindowedValue.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/441fad31
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/441fad31
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/441fad31

Branch: refs/heads/python-sdk
Commit: 441fad31ba42e14cd305277060b7e26a97f68166
Parents: e500699
Author: Robert Bradshaw 
Authored: Wed Jul 20 14:10:11 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 21 17:44:02 2016 -0700

--
 .../apache_beam/utils/windowed_value_test.py| 71 
 1 file changed, 71 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/441fad31/sdks/python/apache_beam/utils/windowed_value_test.py
--
diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py 
b/sdks/python/apache_beam/utils/windowed_value_test.py
new file mode 100644
index 000..f257410
--- /dev/null
+++ b/sdks/python/apache_beam/utils/windowed_value_test.py
@@ -0,0 +1,71 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Unit tests for the windowed_value."""
+
+import copy
+import pickle
+import unittest
+
+from apache_beam.utils import windowed_value
+from apache_beam.transforms.timeutil import Timestamp
+
+
+class WindowedValueTest(unittest.TestCase):
+
+  def test_timestamps(self):
+wv = windowed_value.WindowedValue(None, 3, ())
+self.assertEqual(wv.timestamp, Timestamp.of(3))
+self.assertTrue(wv.timestamp is wv.timestamp)
+self.assertEqual(windowed_value.WindowedValue(None, -2.5, ()).timestamp,
+ Timestamp.of(-2.5))
+
+  def test_with_value(self):
+wv = windowed_value.WindowedValue(1, 3, ())
+self.assertEqual(wv.with_value(10), windowed_value.WindowedValue(10, 3, 
()))
+
+  def test_equality(self):
+self.assertEqual(
+windowed_value.WindowedValue(1, 3, ()),
+windowed_value.WindowedValue(1, 3, ()))
+self.assertNotEqual(
+windowed_value.WindowedValue(1, 3, ()),
+windowed_value.WindowedValue(100, 3, ()))
+self.assertNotEqual(
+windowed_value.WindowedValue(1, 3, ()),
+windowed_value.WindowedValue(1, 300, ()))
+self.assertNotEqual(
+windowed_value.WindowedValue(1, 3, ()),
+windowed_value.WindowedValue(1, 300, ((),)))
+
+self.assertNotEqual(
+windowed_value.WindowedValue(1, 3, ()),
+object())
+
+  def test_hash(self):
+wv = windowed_value.WindowedValue(1, 3, ())
+wv_copy = copy.copy(wv)
+self.assertFalse(wv is wv_copy)
+self.assertEqual({wv: 100}.get(wv_copy), 100)
+
+  def test_pickle(self):
+wv = windowed_value.WindowedValue(1, 3, ())
+self.assertTrue(pickle.loads(pickle.dumps(wv)) == wv)
+
+
+if __name__ == '__main__':
+  unittest.main()



[1/2] incubator-beam git commit: Closes #702

2016-07-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e5006991d -> 09b4daf23


Closes #702


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09b4daf2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09b4daf2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09b4daf2

Branch: refs/heads/python-sdk
Commit: 09b4daf23fc80ad565dbdcd8704c1b4c01d879fc
Parents: e500699 441fad3
Author: Robert Bradshaw 
Authored: Thu Jul 21 17:44:02 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 21 17:44:02 2016 -0700

--
 .../apache_beam/utils/windowed_value_test.py| 71 
 1 file changed, 71 insertions(+)
--




[1/4] incubator-beam git commit: Remove expensive per-element-step logging context.

2016-07-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 37f426faf -> d78b38d1d


Remove expensive per-element-step logging context.

This is 3-4% of the total runtime.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f99aa77f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f99aa77f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f99aa77f

Branch: refs/heads/python-sdk
Commit: f99aa77f9acf5dbfdf762913a1f997d2c3658d3a
Parents: 37f426f
Author: Robert Bradshaw 
Authored: Thu Jul 21 12:08:33 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 21 17:35:49 2016 -0700

--
 sdks/python/apache_beam/runners/common.py | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f99aa77f/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index ef28c63..134fb06 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -95,9 +95,8 @@ class DoFnRunner(object):
 
   def process(self, element):
 try:
-  with self.logger.PerThreadLoggingContext(step_name=self.step_name):
-self.context.set_element(element)
-self._process_outputs(element, self.dofn.process(self.context))
+  self.context.set_element(element)
+  self._process_outputs(element, self.dofn.process(self.context))
 except BaseException as exn:
   self.reraise_augmented(exn)
 



[4/4] incubator-beam git commit: Restore (faster) logging context.

2016-07-21 Thread robertwb
Restore (faster) logging context.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf9e3a3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf9e3a3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf9e3a3

Branch: refs/heads/python-sdk
Commit: ecf9e3a3cc3dcbb3413403d4558c95d4a0097350
Parents: 7c9d77a
Author: Robert Bradshaw <rober...@google.com>
Authored: Thu Jul 21 15:24:01 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd |  8 +++-
 sdks/python/apache_beam/runners/common.py  | 20 ++--
 2 files changed, 21 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index f01a362..7191659 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -27,7 +27,7 @@ cdef class DoFnRunner(object):
   cdef object window_fn
   cdef object context   # TODO(robertwb): Make this a DoFnContext
   cdef object tagged_receivers
-  cdef object logger
+  cdef object logging_context  # TODO(robertwb): Make this a LoggingContext
   cdef object step_name
 
   cdef object main_receivers   # TODO(robertwb): Make this a Receiver
@@ -44,3 +44,9 @@ cdef class DoFnContext(object):
 
 cdef class Receiver(object):
   cdef receive(self, WindowedValue windowed_value)
+
+
+cdef class LoggingContext(object):
+  # TODO(robertwb): Optimize "with [cdef class]"
+  cpdef enter(self)
+  cpdef exit(self)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 80db823..a565645 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -29,14 +29,12 @@ from apache_beam.transforms.window import WindowFn
 from apache_beam.utils.windowed_value import WindowedValue
 
 
-class FakeLogger(object):
-  def PerThreadLoggingContext(self, *unused_args, **unused_kwargs):
-return self
+class LoggingContext(object):
 
-  def __enter__(self):
+  def enter(self):
 pass
 
-  def __exit__(self, *unused_args):
+  def exit(self):
 pass
 
 
@@ -76,7 +74,8 @@ class DoFnRunner(object):
 self.window_fn = windowing.windowfn
 self.context = context
 self.tagged_receivers = tagged_receivers
-self.logger = logger or FakeLogger()
+self.logging_context = (logger.PerThreadLoggingContext(step_name=step_name)
+if logger else LoggingContext())
 self.step_name = step_name
 
 # Optimize for the common case.
@@ -85,23 +84,32 @@ class DoFnRunner(object):
   def start(self):
 self.context.set_element(None)
 try:
+  self.logging_context.enter()
   self._process_outputs(None, self.dofn.start_bundle(self.context))
 except BaseException as exn:
   self.reraise_augmented(exn)
+finally:
+  self.logging_context.exit()
 
   def finish(self):
 self.context.set_element(None)
 try:
+  self.logging_context.enter()
   self._process_outputs(None, self.dofn.finish_bundle(self.context))
 except BaseException as exn:
   self.reraise_augmented(exn)
+finally:
+  self.logging_context.exit()
 
   def process(self, element):
 try:
+  self.logging_context.enter()
   self.context.set_element(element)
   self._process_outputs(element, self.dofn_process(self.context))
 except BaseException as exn:
   self.reraise_augmented(exn)
+finally:
+  self.logging_context.exit()
 
   def reraise_augmented(self, exn):
 if getattr(exn, '_tagged_with_step', False) or not self.step_name:



[3/4] incubator-beam git commit: Closes #706

2016-07-21 Thread robertwb
Closes #706


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d78b38d1
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d78b38d1
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d78b38d1

Branch: refs/heads/python-sdk
Commit: d78b38d1dab75f65aa6dfc3bca53d58bafc1d37a
Parents: 37f426f ecf9e3a
Author: Robert Bradshaw 
Authored: Thu Jul 21 17:36:04 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 21 17:36:04 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd |  9 +++-
 sdks/python/apache_beam/runners/common.py  | 28 +
 2 files changed, 27 insertions(+), 10 deletions(-)
--




[2/4] incubator-beam git commit: Cache dofn.proces method.

2016-07-21 Thread robertwb
Cache dofn.proces method.

Saves another couple percent.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c9d77ac
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c9d77ac
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c9d77ac

Branch: refs/heads/python-sdk
Commit: 7c9d77ac2509c7625c5709f885f8b5aadb4f9f74
Parents: f99aa77
Author: Robert Bradshaw <rober...@google.com>
Authored: Thu Jul 21 12:11:03 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Thu Jul 21 17:36:04 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd | 1 +
 sdks/python/apache_beam/runners/common.py  | 5 -
 2 files changed, 5 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index e855376..f01a362 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -23,6 +23,7 @@ cdef type SideOutputValue, TimestampedValue
 cdef class DoFnRunner(object):
 
   cdef object dofn
+  cdef object dofn_process
   cdef object window_fn
   cdef object context   # TODO(robertwb): Make this a DoFnContext
   cdef object tagged_receivers

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 134fb06..80db823 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -56,6 +56,7 @@ class DoFnRunner(object):
step_name=None):
 if not args and not kwargs:
   self.dofn = fn
+  self.dofn_process = fn.process
 else:
   args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs)
 
@@ -70,6 +71,8 @@ class DoFnRunner(object):
 def finish_bundle(self, context):
   return fn.finish_bundle(context)
   self.dofn = CurriedFn()
+  self.dofn_process = lambda context: fn.process(context, *args, **kwargs)
+
 self.window_fn = windowing.windowfn
 self.context = context
 self.tagged_receivers = tagged_receivers
@@ -96,7 +99,7 @@ class DoFnRunner(object):
   def process(self, element):
 try:
   self.context.set_element(element)
-  self._process_outputs(element, self.dofn.process(self.context))
+  self._process_outputs(element, self.dofn_process(self.context))
 except BaseException as exn:
   self.reraise_augmented(exn)
 



[2/2] incubator-beam git commit: Closes #705

2016-07-21 Thread robertwb
Closes #705


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e5006991
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e5006991
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e5006991

Branch: refs/heads/python-sdk
Commit: e5006991d730f8fb222639f7c23b33cb8f046ce4
Parents: d78b38d 19082ad
Author: Robert Bradshaw 
Authored: Thu Jul 21 17:41:06 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 21 17:41:06 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd |  4 ++--
 sdks/python/apache_beam/runners/common.py  | 15 ---
 2 files changed, 14 insertions(+), 5 deletions(-)
--




[1/2] incubator-beam git commit: Minor cdef value changes.

2016-07-21 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d78b38d1d -> e5006991d


Minor cdef value changes.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/19082adb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/19082adb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/19082adb

Branch: refs/heads/python-sdk
Commit: 19082adb1522365b1f526e8c28a0a04a20744128
Parents: d78b38d
Author: Robert Bradshaw 
Authored: Thu Jul 21 10:36:36 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 21 17:38:55 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd |  4 ++--
 sdks/python/apache_beam/runners/common.py  | 15 ---
 2 files changed, 14 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19082adb/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 7191659..e1bf461 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -39,11 +39,11 @@ cdef class DoFnContext(object):
   cdef object label
   cdef object state
   cdef WindowedValue windowed_value
-  cdef set_element(self, WindowedValue windowed_value)
+  cpdef set_element(self, WindowedValue windowed_value)
 
 
 cdef class Receiver(object):
-  cdef receive(self, WindowedValue windowed_value)
+  cpdef receive(self, WindowedValue windowed_value)
 
 
 cdef class LoggingContext(object):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19082adb/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index a565645..c6b27bc 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -214,15 +214,24 @@ class DoFnContext(object):
 
   @property
   def element(self):
-return self.windowed_value.value
+if self.windowed_value is None:
+  raise AttributeError('element not accessible in this context')
+else:
+  return self.windowed_value.value
 
   @property
   def timestamp(self):
-return self.windowed_value.timestamp
+if self.windowed_value is None:
+  raise AttributeError('timestamp not accessible in this context')
+else:
+  return self.windowed_value.timestamp
 
   @property
   def windows(self):
-return self.windowed_value.windows
+if self.windowed_value is None:
+  raise AttributeError('windows not accessible in this context')
+else:
+  return self.windowed_value.windows
 
   def aggregate_to(self, aggregator, input_value):
 self.state.counter_for(aggregator).update(input_value)



[GitHub] incubator-beam pull request #708: Revert "Restore (faster) logging context."

2016-07-21 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/708

Revert "Restore (faster) logging context."

This fixes the jenkins build that relies on not-yet-pushed dataflow changes.

---

This reverts commit ecf9e3a3cc3dcbb3413403d4558c95d4a0097350.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam no-no-logging

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/708.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #708






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe.

2016-07-28 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 351c3831d -> c155ef0eb


Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for 
the pipe.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1f1fa06
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1f1fa06
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1f1fa06

Branch: refs/heads/python-sdk
Commit: a1f1fa06ee8683273182548e7eb2d6612040d2bf
Parents: 351c383
Author: Marian Dvorsky 
Authored: Thu Jul 28 13:02:15 2016 -0700
Committer: Marian Dvorsky 
Committed: Thu Jul 28 13:02:15 2016 -0700

--
 sdks/python/apache_beam/io/gcsio.py | 30 +-
 1 file changed, 21 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1f1fa06/sdks/python/apache_beam/io/gcsio.py
--
diff --git a/sdks/python/apache_beam/io/gcsio.py 
b/sdks/python/apache_beam/io/gcsio.py
index 9377266..88fcfb8 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -49,6 +49,7 @@ except ImportError:
 
 
 DEFAULT_READ_BUFFER_SIZE = 1024 * 1024
+WRITE_CHUNK_SIZE = 8 * 1024 * 1024
 
 
 def parse_gcs_path(gcs_path):
@@ -546,6 +547,10 @@ class GcsBufferedWriter(object):
 self.closed = False
 self.position = 0
 
+# A small buffer to avoid CPU-heavy per-write pipe calls.
+self.write_buffer = bytearray()
+self.write_buffer_size = 128 * 1024
+
 # Set up communication with uploading thread.
 parent_conn, child_conn = multiprocessing.Pipe()
 self.child_conn = child_conn
@@ -557,7 +562,7 @@ class GcsBufferedWriter(object):
 bucket=self.bucket,
 name=self.name))
 self.upload = transfer.Upload(GcsBufferedWriter.PipeStream(child_conn),
-  mime_type)
+  mime_type, chunksize=WRITE_CHUNK_SIZE)
 self.upload.strategy = transfer.RESUMABLE_UPLOAD
 
 # Start uploading thread.
@@ -598,14 +603,10 @@ class GcsBufferedWriter(object):
 self._check_open()
 if not data:
   return
-try:
-  self.conn.send_bytes(data)
-  self.position += len(data)
-except IOError:
-  if self.upload_thread.last_error:
-raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
-  else:
-raise
+self.write_buffer.extend(data)
+if len(self.write_buffer) > self.write_buffer_size:
+  self._flush_write_buffer()
+self.position += len(data)
 
   def tell(self):
 """Return the total number of bytes passed to write() so far."""
@@ -613,6 +614,7 @@ class GcsBufferedWriter(object):
 
   def close(self):
 """Close the current GCS file."""
+self._flush_write_buffer()
 self.closed = True
 self.conn.close()
 self.upload_thread.join()
@@ -635,3 +637,13 @@ class GcsBufferedWriter(object):
 
   def writable(self):
 return True
+
+  def _flush_write_buffer(self):
+try:
+  self.conn.send_bytes(buffer(self.write_buffer))
+  self.write_buffer = bytearray()
+except IOError:
+  if self.upload_thread.last_error:
+raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
+  else:
+raise



[2/2] incubator-beam git commit: Closes #752

2016-07-28 Thread robertwb
Closes #752


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c155ef0e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c155ef0e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c155ef0e

Branch: refs/heads/python-sdk
Commit: c155ef0ebc88ac34a3681b8bff6152e1857da847
Parents: 351c383 a1f1fa0
Author: Robert Bradshaw 
Authored: Thu Jul 28 18:17:43 2016 -0700
Committer: Robert Bradshaw 
Committed: Thu Jul 28 18:17:43 2016 -0700

--
 sdks/python/apache_beam/io/gcsio.py | 30 +-
 1 file changed, 21 insertions(+), 9 deletions(-)
--




[GitHub] incubator-beam pull request #748: Optimize Map and Flatmap when there are no...

2016-07-27 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/748

Optimize Map and Flatmap when there are no side inputs.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

varargs and kwargs are expensive, even when they're empty.

This is especially true for otherwise one-argument Python calls
which are special cased in CPython.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam fast-noarg

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/748.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #748


commit 257d3c449b68fbf842838c95bb2744bb517d02a9
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-28T01:29:59Z

Optimize Map and Flatmap when there are no side inputs.

varargs and kwargs are expensive, even when they're empty.

This is especially true for otherwise one-argument Python calls
which are special cased in CPython.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #770: Implement add_input for all CombineFns.

2016-08-02 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/770

Implement add_input for all CombineFns.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam add-input

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/770.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #770


commit 3ebf28c6e0d17af3720076e33f88a0f126a89059
Author: Robert Bradshaw <rober...@gmail.com>
Date:   2016-07-26T08:15:55Z

Implement add_input for all CombineFns.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/3] incubator-beam git commit: Implement add_input for all CombineFns.

2016-08-03 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk e834fa82b -> 65152cab8


Implement add_input for all CombineFns.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ebf28c6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ebf28c6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ebf28c6

Branch: refs/heads/python-sdk
Commit: 3ebf28c6e0d17af3720076e33f88a0f126a89059
Parents: e834fa8
Author: Robert Bradshaw 
Authored: Tue Jul 26 01:15:55 2016 -0700
Committer: Robert Bradshaw 
Committed: Tue Aug 2 15:52:28 2016 -0700

--
 sdks/python/apache_beam/transforms/combiners.py | 16 
 sdks/python/apache_beam/transforms/core.py  |  6 +++---
 2 files changed, 11 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/combiners.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index 155dcc6..c3f0da1 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -132,6 +132,9 @@ class CountCombineFn(core.CombineFn):
   def create_accumulator(self):
 return 0
 
+  def add_input(self, accumulator, element):
+return accumulator + 1
+
   def add_inputs(self, accumulator, elements):
 return accumulator + len(elements)
 
@@ -425,9 +428,9 @@ class _TupleCombineFnBase(core.CombineFn):
 
 class TupleCombineFn(_TupleCombineFnBase):
 
-  def add_inputs(self, accumulator, elements):
-return [c.add_inputs(a, e)
-for c, a, e in zip(self._combiners, accumulator, zip(*elements))]
+  def add_input(self, accumulator, element):
+return [c.add_input(a, e)
+for c, a, e in zip(self._combiners, accumulator, element)]
 
   def with_common_input(self):
 return SingleInputTupleCombineFn(*self._combiners)
@@ -435,8 +438,8 @@ class TupleCombineFn(_TupleCombineFnBase):
 
 class SingleInputTupleCombineFn(_TupleCombineFnBase):
 
-  def add_inputs(self, accumulator, elements):
-return [c.add_inputs(a, elements)
+  def add_input(self, accumulator, element):
+return [c.add_input(a, element)
 for c, a in zip(self._combiners, accumulator)]
 
 
@@ -522,9 +525,6 @@ def curry_combine_fn(fn, args, kwargs):
   def add_input(self, accumulator, element):
 return fn.add_input(accumulator, element, *args, **kwargs)
 
-  def add_inputs(self, accumulator, elements):
-return fn.add_inputs(accumulator, elements, *args, **kwargs)
-
   def merge_accumulators(self, accumulators):
 return fn.merge_accumulators(accumulators, *args, **kwargs)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/core.py
--
diff --git a/sdks/python/apache_beam/transforms/core.py 
b/sdks/python/apache_beam/transforms/core.py
index 38b9cd2..da26205 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -270,7 +270,7 @@ class CombineFn(WithTypeHints):
   1. Input values are partitioned into one or more batches.
   2. For each batch, the create_accumulator method is invoked to create a fresh
  initial "accumulator" value representing the combination of zero values.
-  3. For each input value in the batch, the add_inputs method is invoked to
+  3. For each input value in the batch, the add_input method is invoked to
  combine more values with the accumulator for that batch.
   4. The merge_accumulators method is invoked to combine accumulators from
  separate batches into a single combined output accumulator value, once all
@@ -296,7 +296,7 @@ class CombineFn(WithTypeHints):
   def add_input(self, accumulator, element, *args, **kwargs):
 """Return result of folding element into accumulator.
 
-CombineFn implementors must override either add_input or add_inputs.
+CombineFn implementors must override add_input.
 
 Args:
   accumulator: the current accumulator
@@ -420,7 +420,7 @@ class CallableWrapperCombineFn(CombineFn):
 if accumulator is self._EMPTY:
   return self._fn(elements, *args, **kwargs)
 elif isinstance(elements, (list, tuple)):
-  return self._fn([accumulator] + elements, *args, **kwargs)
+  return self._fn([accumulator] + list(elements), *args, **kwargs)
 else:
   def union():
 yield accumulator



[2/3] incubator-beam git commit: Document TupleCombineFns

2016-08-03 Thread robertwb
Document TupleCombineFns


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a2239d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a2239d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a2239d3

Branch: refs/heads/python-sdk
Commit: 4a2239d3701e13622998c71107d263c8966e73e1
Parents: 3ebf28c
Author: Robert Bradshaw 
Authored: Wed Aug 3 13:52:36 2016 -0700
Committer: Robert Bradshaw 
Committed: Wed Aug 3 13:52:36 2016 -0700

--
 sdks/python/apache_beam/transforms/combiners.py | 12 
 1 file changed, 12 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a2239d3/sdks/python/apache_beam/transforms/combiners.py
--
diff --git a/sdks/python/apache_beam/transforms/combiners.py 
b/sdks/python/apache_beam/transforms/combiners.py
index c3f0da1..a0604b8 100644
--- a/sdks/python/apache_beam/transforms/combiners.py
+++ b/sdks/python/apache_beam/transforms/combiners.py
@@ -427,6 +427,12 @@ class _TupleCombineFnBase(core.CombineFn):
 
 
 class TupleCombineFn(_TupleCombineFnBase):
+  """A combiner for combining tuples via a tuple of combiners.
+
+  Takes as input a tuple of N CombineFns and combines N-tuples by
+  combining the k-th element of each tuple with the k-th CombineFn,
+  outputting a new N-tuple of combined values.
+  """
 
   def add_input(self, accumulator, element):
 return [c.add_input(a, e)
@@ -437,6 +443,12 @@ class TupleCombineFn(_TupleCombineFnBase):
 
 
 class SingleInputTupleCombineFn(_TupleCombineFnBase):
+  """A combiner for combining a single value via a tuple of combiners.
+
+  Takes as input a tuple of N CombineFns and combines elements by
+  applying each CombineFn to each input, producing an N-tuple of
+  the outputs corresponding to each of the N CombineFn's outputs.
+  """
 
   def add_input(self, accumulator, element):
 return [c.add_input(a, element)



[GitHub] incubator-beam pull request #702: Add tests for WindowedValue.

2016-07-20 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/702

Add tests for WindowedValue.

I just realized I forgot to add this file when doing #691 .

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam wv-tests

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/702.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #702


commit 59bea7c04a5d4ae2ee69ab0ef11401a0063583a4
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-20T21:10:11Z

Add tests for WindowedValue.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #694

2016-07-20 Thread robertwb
Closes #694


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22126285
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22126285
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22126285

Branch: refs/heads/python-sdk
Commit: 2212628585d6bfc331954b40f8c2823878ab3fce
Parents: 71214b3 7baa662
Author: Robert Bradshaw 
Authored: Wed Jul 20 11:54:20 2016 -0700
Committer: Robert Bradshaw 
Committed: Wed Jul 20 11:54:20 2016 -0700

--
 sdks/python/apache_beam/utils/dependency.py | 6 +-
 1 file changed, 1 insertion(+), 5 deletions(-)
--




[4/5] incubator-beam git commit: Clarifying comments.

2016-07-20 Thread robertwb
Clarifying comments.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d1fc85d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d1fc85d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d1fc85d

Branch: refs/heads/python-sdk
Commit: 9d1fc85d5352f16ce8f0f092865decc6b296dbb8
Parents: 2768981
Author: Robert Bradshaw <rober...@google.com>
Authored: Wed Jul 20 12:24:18 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Wed Jul 20 13:06:21 2016 -0700

--
 sdks/python/apache_beam/runners/common.py   |  1 +
 sdks/python/apache_beam/utils/windowed_value.py | 12 ++--
 2 files changed, 11 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d1fc85d/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 059359c..ef28c63 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -190,6 +190,7 @@ class DoFnState(object):
 self.step_name, aggregator)
 
 
+# TODO(robertwb): Replace core.DoFnContext with this.
 class DoFnContext(object):
 
   def __init__(self, label, element=None, state=None):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d1fc85d/sdks/python/apache_beam/utils/windowed_value.py
--
diff --git a/sdks/python/apache_beam/utils/windowed_value.py 
b/sdks/python/apache_beam/utils/windowed_value.py
index 5172b36..4c50e72 100644
--- a/sdks/python/apache_beam/utils/windowed_value.py
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -72,11 +72,16 @@ class WindowedValue(object):
   # We'd rather implement __eq__, but Cython supports that via __richcmp__
   # instead.  Fortunately __cmp__ is understood by both (but not by Python 3).
   def __cmp__(left, right):  # pylint: disable=no-self-argument
+"""Compares left and right for equality.
+
+For performance reasons, doesn't actually impose an ordering
+on unequal values (always returning 1).
+"""
 if type(left) is not type(right):
   return cmp(type(left), type(right))
 else:
-  # Don't bother paying the cost of a total ordering.
   # TODO(robertwb): Avoid the type checks?
+  # Returns False (0) if equal, and True (1) if not.
   return not WindowedValue._typed_eq(left, right)
 
   @staticmethod
@@ -111,4 +116,7 @@ def create(value, timestamp_micros, windows):
 try:
   WindowedValue.timestamp_object = None
 except TypeError:
-  pass  # Cythonized class already has this default value.
+  # When we're compiled, we can't dynamically add attributes to
+  # the cdef class, but in this case it's OK as it's already present
+  # on each instance.
+  pass



[3/5] incubator-beam git commit: Cythonize WindowedValue class.

2016-07-20 Thread robertwb
t, WindowedValue right) except? 
-2
+
+@cython.locals(wv=WindowedValue)
+cdef inline WindowedValue create(
+  object value, int64_t timestamp_micros, object windows)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc42467f/sdks/python/apache_beam/utils/windowed_value.py
--
diff --git a/sdks/python/apache_beam/utils/windowed_value.py 
b/sdks/python/apache_beam/utils/windowed_value.py
new file mode 100644
index 000..5172b36
--- /dev/null
+++ b/sdks/python/apache_beam/utils/windowed_value.py
@@ -0,0 +1,114 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Core windowing data structures.
+"""
+
+# This module is carefully crafted to have optimal performance when
+# compiled whiel still being valid Python.  Care needs to be taken when
+# editing this file as WindowedValues are created for every element for
+# every step in a Beam pipeline.
+
+#cython: profile=True
+
+from apache_beam.transforms.timeutil import MAX_TIMESTAMP
+from apache_beam.transforms.timeutil import MIN_TIMESTAMP
+from apache_beam.transforms.timeutil import Timestamp
+
+
+class WindowedValue(object):
+  """A windowed value having a value, a timestamp and set of windows.
+
+  Attributes:
+value: The underlying value of a windowed value.
+timestamp: Timestamp associated with the value as seconds since Unix epoch.
+windows: A set (iterable) of window objects for the value. The window
+  object are descendants of the BoundedWindow class.
+  """
+
+  def __init__(self, value, timestamp, windows):
+# For performance reasons, only timestamp_micros is stored by default
+# (as a C int). The Timestamp object is created on demand below.
+self.value = value
+if isinstance(timestamp, int):
+  self.timestamp_micros = timestamp * 100
+else:
+  self.timestamp_object = (timestamp if isinstance(timestamp, Timestamp)
+   else Timestamp.of(timestamp))
+  self.timestamp_micros = self.timestamp_object.micros
+self.windows = windows
+
+  @property
+  def timestamp(self):
+if self.timestamp_object is None:
+  self.timestamp_object = Timestamp(0, self.timestamp_micros)
+return self.timestamp_object
+
+  def __repr__(self):
+return '(%s, %s, %s)' % (
+repr(self.value),
+'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else
+'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else
+float(self.timestamp),
+self.windows)
+
+  def __hash__(self):
+return hash(self.value) + 3 * self.timestamp_micros + 7 * 
hash(self.windows)
+
+  # We'd rather implement __eq__, but Cython supports that via __richcmp__
+  # instead.  Fortunately __cmp__ is understood by both (but not by Python 3).
+  def __cmp__(left, right):  # pylint: disable=no-self-argument
+if type(left) is not type(right):
+  return cmp(type(left), type(right))
+else:
+  # Don't bother paying the cost of a total ordering.
+  # TODO(robertwb): Avoid the type checks?
+  return not WindowedValue._typed_eq(left, right)
+
+  @staticmethod
+  def _typed_eq(left, right):
+if (left.timestamp_micros == right.timestamp_micros
+and left.value == right.value
+and left.windows == right.windows):
+  return True
+else:
+  return False
+
+  def with_value(self, new_value):
+"""Creates a new WindowedValue with the same timestamps and windows as 
this.
+
+This is the fasted way to create a new WindowedValue.
+"""
+return create(new_value, self.timestamp_micros, self.windows)
+
+  def __reduce__(self):
+return WindowedValue, (self.value, self.timestamp, self.windows)
+
+
+# TODO(robertwb): Move this to a static method.
+def create(value, timestamp_micros, windows):
+  wv = WindowedValue.__new__(WindowedValue)
+  wv.value = value
+  wv.timestamp_micros = timestamp_micros
+  wv.windows = windows
+  return wv
+
+
+try:
+  WindowedValue.timestamp_object = None
+except TypeError:
+  pass  # Cythonized class already has this default value.

http://git-wip-us.apache.org/repos/asf/incubator-bea

[5/5] incubator-beam git commit: Add Cython DoFnContext and Receiver stubs.

2016-07-20 Thread robertwb
Add Cython DoFnContext and Receiver stubs.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8efc231c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8efc231c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8efc231c

Branch: refs/heads/python-sdk
Commit: 8efc231c867fcf5267ba21a8a33ee306b87d9945
Parents: dc42467
Author: Robert Bradshaw <rober...@google.com>
Authored: Tue Jul 19 12:22:08 2016 -0700
Committer: Robert Bradshaw <rober...@google.com>
Committed: Wed Jul 20 13:06:21 2016 -0700

--
 sdks/python/apache_beam/runners/common.pxd | 20 ---
 sdks/python/apache_beam/runners/common.py  | 34 -
 2 files changed, 50 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.pxd
--
diff --git a/sdks/python/apache_beam/runners/common.pxd 
b/sdks/python/apache_beam/runners/common.pxd
index 480c056..e855376 100644
--- a/sdks/python/apache_beam/runners/common.pxd
+++ b/sdks/python/apache_beam/runners/common.pxd
@@ -15,17 +15,31 @@
 # limitations under the License.
 #
 
-cdef type SideOutputValue, TimestampedValue, WindowedValue
+from apache_beam.utils.windowed_value cimport WindowedValue
+
+cdef type SideOutputValue, TimestampedValue
+
 
 cdef class DoFnRunner(object):
 
   cdef object dofn
   cdef object window_fn
-  cdef object context
+  cdef object context   # TODO(robertwb): Make this a DoFnContext
   cdef object tagged_receivers
   cdef object logger
   cdef object step_name
 
-  cdef object main_receivers
+  cdef object main_receivers   # TODO(robertwb): Make this a Receiver
 
   cpdef _process_outputs(self, element, results)
+
+
+cdef class DoFnContext(object):
+  cdef object label
+  cdef object state
+  cdef WindowedValue windowed_value
+  cdef set_element(self, WindowedValue windowed_value)
+
+
+cdef class Receiver(object):
+  cdef receive(self, WindowedValue windowed_value)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.py
--
diff --git a/sdks/python/apache_beam/runners/common.py 
b/sdks/python/apache_beam/runners/common.py
index 3c0c3f6..059359c 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -25,8 +25,8 @@ from apache_beam.internal import util
 from apache_beam.pvalue import SideOutputValue
 from apache_beam.transforms import core
 from apache_beam.transforms.window import TimestampedValue
-from apache_beam.transforms.window import WindowedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.utils.windowed_value import WindowedValue
 
 
 class FakeLogger(object):
@@ -188,3 +188,35 @@ class DoFnState(object):
 """Looks up the counter for this aggregator, creating one if necessary."""
 return self._counter_factory.get_aggregator_counter(
 self.step_name, aggregator)
+
+
+class DoFnContext(object):
+
+  def __init__(self, label, element=None, state=None):
+self.label = label
+self.state = state
+if element is not None:
+  self.set_element(element)
+
+  def set_element(self, windowed_value):
+self.windowed_value = windowed_value
+
+  @property
+  def element(self):
+return self.windowed_value.value
+
+  @property
+  def timestamp(self):
+return self.windowed_value.timestamp
+
+  @property
+  def windows(self):
+return self.windowed_value.windows
+
+  def aggregate_to(self, aggregator, input_value):
+self.state.counter_for(aggregator).update(input_value)
+
+
+class Receiver(object):
+  def receive(self, windowed_value):
+raise NotImplementedError



[2/5] incubator-beam git commit: Reduce the number of elements in the pvalue caching test.

2016-07-20 Thread robertwb
Reduce the number of elements in the pvalue caching test.

It seems this test is causing travis-ci to time out,
as the non-compiled version got slightly slower.

100,000 elements should be sufficient to see the effects
of not caching.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27689819
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27689819
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27689819

Branch: refs/heads/python-sdk
Commit: 276898195819329a1b88002a41a109df61530a26
Parents: 8efc231
Author: Robert Bradshaw 
Authored: Wed Jul 20 01:37:49 2016 -0700
Committer: Robert Bradshaw 
Committed: Wed Jul 20 13:06:21 2016 -0700

--
 sdks/python/apache_beam/pipeline_test.py | 18 ++
 1 file changed, 10 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27689819/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index c1db5cb..86ae45f 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -207,11 +207,13 @@ class PipelineTest(unittest.TestCase):
   yield o
   yield SideOutputValue('side', o)
 
+num_elements = 10
+
 pipeline = Pipeline('DirectPipelineRunner')
 
 gc.collect()
 count_threshold = len(gc.get_objects()) + 1
-biglist = pipeline | Create('oom:create', ['x'] * 100)
+biglist = pipeline | Create('oom:create', ['x'] * num_elements)
 dupes = (
 biglist
 | Map('oom:addone', lambda x: (x, 1))
@@ -223,17 +225,17 @@ class PipelineTest(unittest.TestCase):
 | CombinePerKey('oom:combine', sum)
 | Map('oom:check', check_memory, count_threshold))
 
-assert_that(result, equal_to([('x', 300)]))
+assert_that(result, equal_to([('x', 3 * num_elements)]))
 pipeline.run()
 self.assertEqual(
 pipeline.runner.debug_counters['element_counts'],
 {
-'oom:flatten': 300,
-('oom:combine/GroupByKey/reify_windows', None): 300,
-('oom:dupes/oom:dupes', 'side'): 100,
-('oom:dupes/oom:dupes', None): 100,
-'oom:create': 100,
-('oom:addone', None): 100,
+'oom:flatten': 3 * num_elements,
+('oom:combine/GroupByKey/reify_windows', None): 3 * num_elements,
+('oom:dupes/oom:dupes', 'side'): num_elements,
+('oom:dupes/oom:dupes', None): num_elements,
+'oom:create': num_elements,
+('oom:addone', None): num_elements,
 'oom:combine/GroupByKey/group_by_key': 1,
 ('oom:check', None): 1,
 'assert_that/singleton': 1,



[1/5] incubator-beam git commit: Closes #691

2016-07-20 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk a643e2009 -> 37f426faf


Closes #691


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37f426fa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37f426fa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37f426fa

Branch: refs/heads/python-sdk
Commit: 37f426faf5c47d5b99d100292fd0cde6a640939e
Parents: a643e20 9d1fc85
Author: Robert Bradshaw 
Authored: Wed Jul 20 13:06:21 2016 -0700
Committer: Robert Bradshaw 
Committed: Wed Jul 20 13:06:21 2016 -0700

--
 sdks/python/apache_beam/pipeline_test.py|  18 +--
 sdks/python/apache_beam/runners/common.pxd  |  20 ++-
 sdks/python/apache_beam/runners/common.py   |  35 +-
 sdks/python/apache_beam/transforms/window.py|  37 +-
 .../python/apache_beam/utils/windowed_value.pxd |  38 ++
 sdks/python/apache_beam/utils/windowed_value.py | 122 +++
 sdks/python/setup.py|   1 +
 7 files changed, 223 insertions(+), 48 deletions(-)
--




[GitHub] incubator-beam pull request #676: Remove pipeline.apply(pvalue, callable)

2016-07-18 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/676

Remove pipeline.apply(pvalue, callable)

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam no-callable

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/676.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #676


commit dd0b437fdd6ffa13f758dfd8daed4ee6aa4fd758
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-18T17:23:24Z

Remove pipeline.apply(pvalue, callable)




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #679: Add >> operator for labeling PTransforms.

2016-07-18 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/679

Add >> operator for labeling PTransforms.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---

Also propagates label passed into PValue.apply(...) method.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam rshift

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #679


commit 6c89554ec014440ffd778e58bc1010ac68afbc91
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-18T19:46:54Z

Add >> operator for labeling PTransforms.

Also propagates label passed into PValue.apply(...) method.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[2/2] incubator-beam git commit: Closes #683

2016-07-19 Thread robertwb
Closes #683


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/062af66f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/062af66f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/062af66f

Branch: refs/heads/python-sdk
Commit: 062af66f83329485d06c6dc68afa29b6a39a91b1
Parents: 5610905 fdab1f9
Author: Robert Bradshaw 
Authored: Mon Jul 18 23:41:14 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 23:41:14 2016 -0700

--
 sdks/python/apache_beam/coders/coders.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--




[1/2] incubator-beam git commit: Fix comment.

2016-07-19 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 561090580 -> 062af66f8


Fix comment.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fdab1f94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fdab1f94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fdab1f94

Branch: refs/heads/python-sdk
Commit: fdab1f9458eb8ad8b0210b8ccc54f5a44960f3bc
Parents: 5610905
Author: Robert Bradshaw 
Authored: Mon Jul 18 23:40:25 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 23:40:25 2016 -0700

--
 sdks/python/apache_beam/coders/coders.py | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fdab1f94/sdks/python/apache_beam/coders/coders.py
--
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index 1c5043c..a5ed7f9 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -384,9 +384,9 @@ class FastPrimitivesCoder(FastCoder):
 
 return value
 
-  # We allow .key_coder() and .value_coder() to be called on PickleCoder since
-  # we can't always infer the return values of lambdas in ParDo operations, the
-  # result of which may be used in a GroupBykey.
+  # We allow .key_coder() and .value_coder() to be called on 
FastPrimitivesCoder
+  # since we can't always infer the return values of lambdas in ParDo
+  # operations, the result of which may be used in a GroupBykey.
   def is_kv_coder(self):
 return True
 



[2/2] incubator-beam git commit: Remove pipeline.apply(pvalue, callable)

2016-07-18 Thread robertwb
Remove pipeline.apply(pvalue, callable)


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e68eb05e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e68eb05e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e68eb05e

Branch: refs/heads/python-sdk
Commit: e68eb05e5ffa366f83478807bbac3af83ea8cda5
Parents: 5daab7f
Author: Robert Bradshaw 
Authored: Mon Jul 18 10:23:24 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:46:16 2016 -0700

--
 sdks/python/apache_beam/dataflow_test.py|  5 +++--
 sdks/python/apache_beam/pipeline.py | 22 +++-
 sdks/python/apache_beam/pipeline_test.py| 18 
 .../python/apache_beam/transforms/ptransform.py |  2 ++
 4 files changed, 8 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/dataflow_test.py
--
diff --git a/sdks/python/apache_beam/dataflow_test.py 
b/sdks/python/apache_beam/dataflow_test.py
index c4933af..9bbb5ff 100644
--- a/sdks/python/apache_beam/dataflow_test.py
+++ b/sdks/python/apache_beam/dataflow_test.py
@@ -23,6 +23,7 @@ import logging
 import re
 import unittest
 
+import apache_beam as beam
 from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsDict
 from apache_beam.pvalue import AsIter as AllOf
@@ -51,8 +52,8 @@ class DataflowTest(unittest.TestCase):
 
   # TODO(silviuc): Figure out a nice way to specify labels for stages so that
   # internal steps get prepended with surorunding stage names.
-  @staticmethod
-  def Count(pcoll):  # pylint: disable=invalid-name
+  @beam.ptransform_fn
+  def Count(pcoll):  # pylint: disable=invalid-name, no-self-argument
 """A Count transform: v, ... => (v, n), ..."""
 return (pcoll
 | Map('AddCount', lambda x: (x, 1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline.py
--
diff --git a/sdks/python/apache_beam/pipeline.py 
b/sdks/python/apache_beam/pipeline.py
index 012d4d9..bc1feb2 100644
--- a/sdks/python/apache_beam/pipeline.py
+++ b/sdks/python/apache_beam/pipeline.py
@@ -47,7 +47,6 @@ import logging
 import os
 import shutil
 import tempfile
-import types
 
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -187,20 +186,17 @@ class Pipeline(object):
 """Applies a custom transform using the pvalueish specified.
 
 Args:
-  transform: the PTranform (or callable) to apply.
+  transform: the PTranform to apply.
   pvalueish: the input for the PTransform (typically a PCollection).
 
 Raises:
   TypeError: if the transform object extracted from the argument list is
-not a callable type or a descendant from PTransform.
+not a PTransform.
   RuntimeError: if the transform object was already applied to this 
pipeline
 and needs to be cloned in order to apply again.
 """
 if not isinstance(transform, ptransform.PTransform):
-  if isinstance(transform, (type, types.ClassType)):
-raise TypeError("%s is not a PTransform instance, did you mean %s()?"
-% (transform, transform.__name__))
-  transform = _CallableWrapperPTransform(transform)
+  raise TypeError("Expected a PTransform object, got %s" % transform)
 
 full_label = format_full_label(self._current_transform(), transform)
 if full_label in self.applied_labels:
@@ -286,18 +282,6 @@ class Pipeline(object):
 return pvalueish_result
 
 
-class _CallableWrapperPTransform(ptransform.PTransform):
-
-  def __init__(self, callee):
-assert callable(callee)
-super(_CallableWrapperPTransform, self).__init__(
-label=getattr(callee, '__name__', 'Callable'))
-self._callee = callee
-
-  def apply(self, *args, **kwargs):
-return self._callee(*args, **kwargs)
-
-
 class PipelineVisitor(object):
   """Visitor pattern class used to traverse a DAG of transforms.
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline_test.py
--
diff --git a/sdks/python/apache_beam/pipeline_test.py 
b/sdks/python/apache_beam/pipeline_test.py
index 8598737..04cd2ee 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -32,7 +32,6 @@ from apache_beam.transforms import Create
 from apache_beam.transforms import FlatMap
 from apache_beam.transforms import Flatten
 from apache_beam.transforms import Map
-from apache_beam.transforms import 

[1/2] incubator-beam git commit: Add >> operator for labeling PTransforms.

2016-07-18 Thread robertwb
cm.exception.message,
 'Transform "CustomTransform" does not have a stable unique label. '
 'This will prevent updating of pipelines. '
-'To clone a transform with a new label use: '
-'transform.clone("NEW LABEL").')
+'To apply a transform with a specified label write '
+'pvalue | "label" >> transform')
 
   def test_reuse_cloned_custom_transform_instance(self):
 pipeline = Pipeline(self.runner_name)
-pcoll1 = pipeline | Create('pcoll1', [1, 2, 3])
-pcoll2 = pipeline | Create('pcoll2', [4, 5, 6])
+pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3])
+pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6])
 transform = PipelineTest.CustomTransform()
 result1 = pcoll1 | transform
-result2 = pcoll2 | transform.clone('new label')
+result2 = pcoll2 | 'new_label' >> transform
 assert_that(result1, equal_to([2, 3, 4]), label='r1')
 assert_that(result2, equal_to([5, 6, 7]), label='r2')
 pipeline.run()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a740f827/sdks/python/apache_beam/pvalue.py
--
diff --git a/sdks/python/apache_beam/pvalue.py 
b/sdks/python/apache_beam/pvalue.py
index 6fc3041..063d0b5 100644
--- a/sdks/python/apache_beam/pvalue.py
+++ b/sdks/python/apache_beam/pvalue.py
@@ -79,10 +79,8 @@ class PValue(object):
 optional first label and a transform/callable object. It will call the
 pipeline.apply() method with this modified argument list.
 """
-if isinstance(args[0], str):
-  # TODO(robertwb): Make sure labels are properly passed during
-  # ptransform construction and drop this argument.
-  args = args[1:]
+if isinstance(args[0], basestring):
+  kwargs['label'], args = args[0], args[1:]
 arglist = list(args)
 arglist.insert(1, self)
 return self.pipeline.apply(*arglist, **kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a740f827/sdks/python/apache_beam/transforms/ptransform.py
--
diff --git a/sdks/python/apache_beam/transforms/ptransform.py 
b/sdks/python/apache_beam/transforms/ptransform.py
index 1457bec..bde05b5 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -369,6 +369,9 @@ class PTransform(WithTypeHints):
 # TODO(robertwb): Assert all input WindowFns compatible.
 return inputs[0].windowing
 
+  def __rrshift__(self, label):
+return _NamedPTransform(self, label)
+
   def __or__(self, right):
 """Used to compose PTransforms, e.g., ptransform1 | ptransform2."""
 if isinstance(right, PTransform):
@@ -676,24 +679,6 @@ def ptransform_fn(fn):
   return CallablePTransform(fn)
 
 
-def format_full_label(applied_transform, pending_transform):
-  """Returns a fully formatted cumulative PTransform label.
-
-  Args:
-applied_transform: An instance of an AppliedPTransform that has been fully
-  applied prior to 'pending_transform'.
-pending_transform: An instance of PTransform that has yet to be applied to
-  the Pipeline.
-
-  Returns:
-A fully formatted PTransform label. Example: '/foo/bar/baz'.
-  """
-  label = '/'.join([applied_transform.full_label, pending_transform.label])
-  # Remove leading backslash because the monitoring UI expects names that do 
not
-  # start with such a character.
-  return label if not label.startswith('/') else label[1:]
-
-
 def label_from_callable(fn):
   if hasattr(fn, 'default_label'):
 return fn.default_label()
@@ -706,3 +691,13 @@ def label_from_callable(fn):
   return fn.__name__
   else:
 return str(fn)
+
+
+class _NamedPTransform(PTransform):
+
+  def __init__(self, transform, label):
+super(_NamedPTransform, self).__init__(label)
+self.transform = transform
+
+  def apply(self, pvalue):
+raise RuntimeError("Should never be applied directly.")



[GitHub] incubator-beam pull request #679: Add >> operator for labeling PTransforms.

2016-07-18 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/679


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #676: Remove pipeline.apply(pvalue, callable)

2016-07-18 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/676


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Closes #676

2016-07-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 5daab7fb3 -> ad7c216f4


Closes #676


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad7c216f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad7c216f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad7c216f

Branch: refs/heads/python-sdk
Commit: ad7c216f4949cb098ab7eacaefc68c22f6a16bad
Parents: 5daab7f e68eb05
Author: Robert Bradshaw 
Authored: Mon Jul 18 17:46:16 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:46:16 2016 -0700

--
 sdks/python/apache_beam/dataflow_test.py|  5 +++--
 sdks/python/apache_beam/pipeline.py | 22 +++-
 sdks/python/apache_beam/pipeline_test.py| 18 
 .../python/apache_beam/transforms/ptransform.py |  2 ++
 4 files changed, 8 insertions(+), 39 deletions(-)
--




[2/2] incubator-beam git commit: Fixing broken example tests

2016-07-18 Thread robertwb
Fixing broken example tests

Nose does not pick up decorated tests unless the decorator name
starts with test_. This resulted in some test being inadvertently
disabled.

Also OptionsContext
(https://github.com/aaltay/incubator-beam/blob/python-sdk/sdks/python/apache_beam/utils/options.py#L457)
is not doing what it is supposed to do. Its augment_options() method is
not called therefore using OptionsContext does not override the option
values as expected. I will remove this class and a few uses of it in
tests, in a follow up.

Finally removed contains_in_any_order in favor of equal_to. (As
discuessed in:
https://github.com/apache/incubator-beam/pull/650#discussion_r70906340


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bfd6813
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bfd6813
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bfd6813

Branch: refs/heads/python-sdk
Commit: 3bfd6813960eb8b6161bac79b96349e4c721bfea
Parents: ad7c216
Author: Ahmet Altay 
Authored: Mon Jul 18 14:24:51 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:47:53 2016 -0700

--
 .../examples/complete/autocomplete_test.py  |  4 +-
 .../examples/complete/estimate_pi_test.py   |  1 +
 .../examples/cookbook/group_with_coder_test.py  | 70 
 sdks/python/apache_beam/transforms/util.py  | 13 
 4 files changed, 45 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bfd6813/sdks/python/apache_beam/examples/complete/autocomplete_test.py
--
diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py 
b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
index 1b3ee5f..84f947b 100644
--- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py
+++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py
@@ -22,7 +22,7 @@ import unittest
 import apache_beam as beam
 from apache_beam.examples.complete import autocomplete
 from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import contains_in_any_order
+from apache_beam.transforms.util import equal_to
 
 
 class AutocompleteTest(unittest.TestCase):
@@ -35,7 +35,7 @@ class AutocompleteTest(unittest.TestCase):
 result = words | autocomplete.TopPerPrefix('test', 5)
 # values must be hashable for now
 result = result | beam.Map(lambda (k, vs): (k, tuple(vs)))
-assert_that(result, contains_in_any_order(
+assert_that(result, equal_to(
 [
 ('t', ((3, 'to'), (2, 'this'), (1, 'that'))),
 ('to', ((3, 'to'), )),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bfd6813/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
--
diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py 
b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
index 7ca82d7..c633bb1 100644
--- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
+++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py
@@ -17,6 +17,7 @@
 
 """Test for the estimate_pi example."""
 
+import logging
 import unittest
 
 import apache_beam as beam

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bfd6813/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
--
diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py 
b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
index 07211a9..fb52809 100644
--- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py
@@ -18,13 +18,10 @@
 """Test for the custom coders example."""
 
 import logging
+import tempfile
 import unittest
 
-import apache_beam as beam
 from apache_beam.examples.cookbook import group_with_coder
-from apache_beam.transforms.util import assert_that
-from apache_beam.transforms.util import equal_to
-from apache_beam.utils.options import OptionsContext
 
 
 # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was
@@ -39,37 +36,54 @@ class GroupWithCoderTest(unittest.TestCase):
   'joe,20', 'fred,6', 'ann,5',
   'joe,30', 'ann,10', 'mary,1']
 
-  @OptionsContext(pipeline_type_check=True)
-  def test_basics_with_type_check_n(self):
-# Run the workflow with pipeline_type_check option. This will make sure
+  def create_temp_file(self, records):
+with tempfile.NamedTemporaryFile(delete=False) as f:
+  for record in records:
+ 

[1/2] incubator-beam git commit: Closes #685

2016-07-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk ad7c216f4 -> 69f895a2e


Closes #685


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69f895a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69f895a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69f895a2

Branch: refs/heads/python-sdk
Commit: 69f895a2e7f5a4ef39c060cfb0320c87e64ea8be
Parents: ad7c216 3bfd681
Author: Robert Bradshaw 
Authored: Mon Jul 18 17:47:53 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:47:53 2016 -0700

--
 .../examples/complete/autocomplete_test.py  |  4 +-
 .../examples/complete/estimate_pi_test.py   |  1 +
 .../examples/cookbook/group_with_coder_test.py  | 70 
 sdks/python/apache_beam/transforms/util.py  | 13 
 4 files changed, 45 insertions(+), 43 deletions(-)
--




[2/2] incubator-beam git commit: Closes #679

2016-07-18 Thread robertwb
Closes #679


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/56109058
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/56109058
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/56109058

Branch: refs/heads/python-sdk
Commit: 5610905803e03bafd575daf340c8e38113bb78cd
Parents: b00f915 a740f82
Author: Robert Bradshaw 
Authored: Mon Jul 18 17:59:00 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:59:00 2016 -0700

--
 sdks/python/apache_beam/dataflow_test.py|  2 --
 sdks/python/apache_beam/pipeline.py | 14 +
 sdks/python/apache_beam/pipeline_test.py| 10 +++
 sdks/python/apache_beam/pvalue.py   |  6 ++--
 .../python/apache_beam/transforms/ptransform.py | 31 
 5 files changed, 29 insertions(+), 34 deletions(-)
--




[4/4] incubator-beam git commit: Implement coder optimized for coding primitives.

2016-07-18 Thread robertwb
ders.py 
b/sdks/python/apache_beam/coders/coders.py
index 619586f..cf5ca6d 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -354,6 +354,49 @@ class DeterministicPickleCoder(FastCoder):
 return self
 
 
+class FastPrimitivesCoder(FastCoder):
+  """Encodes simple primitives (e.g. str, int) efficiently.
+
+  For unknown types, falls back to another coder (e.g. PickleCoder).
+  """
+  def __init__(self, fallback_coder):
+self._fallback_coder = fallback_coder
+
+  def _create_impl(self):
+return coder_impl.FastPrimitivesCoderImpl(
+self._fallback_coder.get_impl())
+
+  def is_deterministic(self):
+return self._fallback_coder.is_deterministic()
+
+  def as_cloud_object(self, is_pair_like=True):
+value = super(FastCoder, self).as_cloud_object()
+# We currently use this coder in places where we cannot infer the coder to
+# use for the value type in a more granular way.  In places where the
+# service expects a pair, it checks for the "is_pair_like" key, in which
+# case we would fail without the hack below.
+if is_pair_like:
+  value['is_pair_like'] = True
+  value['component_encodings'] = [
+  self.as_cloud_object(is_pair_like=False),
+  self.as_cloud_object(is_pair_like=False)
+  ]
+
+return value
+
+  # We allow .key_coder() and .value_coder() to be called on PickleCoder since
+  # we can't always infer the return values of lambdas in ParDo operations, the
+  # result of which may be used in a GroupBykey.
+  def is_kv_coder(self):
+return True
+
+  def key_coder(self):
+return self
+
+  def value_coder(self):
+return self
+
+
 class Base64PickleCoder(Coder):
   """Coder of objects by Python pickle, then base64 encoding."""
   # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders_test_common.py
--
diff --git a/sdks/python/apache_beam/coders/coders_test_common.py 
b/sdks/python/apache_beam/coders/coders_test_common.py
index 0266fdc..b084947 100644
--- a/sdks/python/apache_beam/coders/coders_test_common.py
+++ b/sdks/python/apache_beam/coders/coders_test_common.py
@@ -111,6 +111,14 @@ class CodersTest(unittest.TestCase):
 coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())),
 (1, cell_value))
 
+  def test_fast_primitives_coder(self):
+coder = coders.FastPrimitivesCoder(coders.SingletonCoder(len))
+self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101')
+self.check_coder(coder, (), (1, 2, 3))
+self.check_coder(coder, [], [1, 2, 3])
+self.check_coder(coder, len)
+self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,))
+
   def test_bytes_coder(self):
 self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
 



[1/4] incubator-beam git commit: Closes #683

2016-07-18 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 69f895a2e -> b00f915ee


Closes #683


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b00f915e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b00f915e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b00f915e

Branch: refs/heads/python-sdk
Commit: b00f915eeeb5bca9addda2a9a5ff500e81954d80
Parents: 69f895a 4c51273
Author: Robert Bradshaw 
Authored: Mon Jul 18 17:48:33 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:48:33 2016 -0700

--
 sdks/python/apache_beam/coders/coder_impl.pxd   | 10 +++
 sdks/python/apache_beam/coders/coder_impl.py| 80 
 sdks/python/apache_beam/coders/coders.py| 43 +++
 .../apache_beam/coders/coders_test_common.py|  9 +++
 sdks/python/apache_beam/coders/typecoders.py|  4 +-
 5 files changed, 144 insertions(+), 2 deletions(-)
--




[2/4] incubator-beam git commit: Used fast primitives coder as fallback coder.

2016-07-18 Thread robertwb
Used fast primitives coder as fallback coder.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64457d04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64457d04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64457d04

Branch: refs/heads/python-sdk
Commit: 64457d04f60859c7f0ef53c9d847090a9fa754e4
Parents: f58c2bd
Author: Robert Bradshaw 
Authored: Mon Jul 18 14:25:08 2016 -0700
Committer: Robert Bradshaw 
Committed: Mon Jul 18 17:48:33 2016 -0700

--
 sdks/python/apache_beam/coders/coders.py | 2 +-
 sdks/python/apache_beam/coders/typecoders.py | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/coders.py
--
diff --git a/sdks/python/apache_beam/coders/coders.py 
b/sdks/python/apache_beam/coders/coders.py
index cf5ca6d..1c5043c 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -359,7 +359,7 @@ class FastPrimitivesCoder(FastCoder):
 
   For unknown types, falls back to another coder (e.g. PickleCoder).
   """
-  def __init__(self, fallback_coder):
+  def __init__(self, fallback_coder=PickleCoder()):
 self._fallback_coder = fallback_coder
 
   def _create_impl(self):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/typecoders.py
--
diff --git a/sdks/python/apache_beam/coders/typecoders.py 
b/sdks/python/apache_beam/coders/typecoders.py
index 2fba752..74e5770 100644
--- a/sdks/python/apache_beam/coders/typecoders.py
+++ b/sdks/python/apache_beam/coders/typecoders.py
@@ -86,9 +86,9 @@ class CoderRegistry(object):
 self._register_coder_internal(bytes, coders.BytesCoder)
 self._register_coder_internal(unicode, coders.StrUtf8Coder)
 self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder)
+self._fallback_coder = fallback_coder or coders.FastPrimitivesCoder
 self._register_coder_internal(typehints.AnyTypeConstraint,
-  coders.PickleCoder)
-self._fallback_coder = fallback_coder or coders.PickleCoder
+  self._fallback_coder)
 
   def _register_coder_internal(self, typehint_type, typehint_coder_class):
 self._coders[typehint_type] = typehint_coder_class



[GitHub] incubator-beam pull request #691: Add low-level data structures to be shared...

2016-07-19 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/691

Add low-level data structures to be shared with the worker.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam cdef-structs

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/691.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #691


commit 3653a2b378d848fdc557e008111769f52cad1a23
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-19T19:19:47Z

Cythonize WindowedValue class.

commit ec33ed4b71ca94faa59e399c4a701df6a11e21d6
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-19T19:22:08Z

Add Cython DoFnContext and Receiver stubs.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] incubator-beam pull request #696: Cythonize WindowedValue class.

2016-07-19 Thread robertwb
GitHub user robertwb opened a pull request:

https://github.com/apache/incubator-beam/pull/696

Cythonize WindowedValue class.

Be sure to do all of the following to help us incorporate your contribution
quickly and easily:

 - [ ] Make sure the PR title is formatted like:
   `[BEAM-] Description of pull request`
 - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable
   Travis-CI on your fork and ensure the whole test matrix passes).
 - [ ] Replace `` in the title with the actual Jira issue
   number, if there is one.
 - [ ] If this contribution is large, please file an Apache
   [Individual Contributor License 
Agreement](https://www.apache.org/licenses/icla.txt).

---



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/robertwb/incubator-beam cdef-structs-wv-only

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/incubator-beam/pull/696.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #696


commit 862e0650985b4a6436abf49197710cb87037111a
Author: Robert Bradshaw <rober...@google.com>
Date:   2016-07-19T19:19:47Z

Cythonize WindowedValue class.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[1/2] incubator-beam git commit: Closes #599

2016-07-15 Thread robertwb
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk d898d56ae -> 1305c108a


Closes #599


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1305c108
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1305c108
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1305c108

Branch: refs/heads/python-sdk
Commit: 1305c108a63cb1675fd9bc36fa990bb87033eaeb
Parents: d898d56 c9c31fd
Author: Robert Bradshaw 
Authored: Fri Jul 15 10:23:13 2016 -0700
Committer: Robert Bradshaw 
Committed: Fri Jul 15 10:23:13 2016 -0700

--
 sdks/python/apache_beam/io/filebasedsource.py |  9 +
 sdks/python/apache_beam/io/fileio.py  | 14 ++
 sdks/python/apache_beam/io/gcsio.py   | 15 +++
 sdks/python/apache_beam/io/gcsio_test.py  |  8 
 sdks/python/apache_beam/io/range_trackers.py  |  3 ++-
 sdks/python/apache_beam/io/range_trackers_test.py |  3 +++
 6 files changed, 43 insertions(+), 9 deletions(-)
--




[2/2] incubator-beam git commit: Fixes several issues related to 'filebasedsource'.

2016-07-15 Thread robertwb
Fixes several issues related to 'filebasedsource'.

Adds a method 'fileio.ChannelFactory.size_in_bytes()' that can be used to 
determine the size of a single file.
Implements this method for 'ChannelFactory' implementations for GCS and local 
files.
Updates 'filebasedsource' to use this method when determining size of files.

Fixes a small bug in 'OffsetRangeTracker'.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9c31fdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9c31fdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9c31fdf

Branch: refs/heads/python-sdk
Commit: c9c31fdffbe2365a8dedd3154726ab1c01cfa889
Parents: d898d56
Author: Chamikara Jayalath 
Authored: Wed Jul 6 20:25:04 2016 -0700
Committer: Robert Bradshaw 
Committed: Fri Jul 15 10:23:13 2016 -0700

--
 sdks/python/apache_beam/io/filebasedsource.py |  9 +
 sdks/python/apache_beam/io/fileio.py  | 14 ++
 sdks/python/apache_beam/io/gcsio.py   | 15 +++
 sdks/python/apache_beam/io/gcsio_test.py  |  8 
 sdks/python/apache_beam/io/range_trackers.py  |  3 ++-
 sdks/python/apache_beam/io/range_trackers_test.py |  3 +++
 6 files changed, 43 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/filebasedsource.py
--
diff --git a/sdks/python/apache_beam/io/filebasedsource.py 
b/sdks/python/apache_beam/io/filebasedsource.py
index c877e44..aa0820d 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,7 +26,6 @@ For an example implementation of ``FileBasedSource`` see 
``avroio.AvroSource``.
 """
 
 from multiprocessing.pool import ThreadPool
-import os
 import range_trackers
 
 from apache_beam.io import fileio
@@ -131,13 +130,7 @@ class FileBasedSource(iobase.BoundedSource):
   def _estimate_sizes_in_parallel(file_names):
 
 def _calculate_size_of_file(file_name):
-  f = fileio.ChannelFactory.open(
-  file_name, 'rb', 'application/octet-stream')
-  try:
-f.seek(0, os.SEEK_END)
-return f.tell()
-  finally:
-f.close()
+  return fileio.ChannelFactory.size_in_bytes(file_name)
 
 return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map(
 _calculate_size_of_file, file_names)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/fileio.py
--
diff --git a/sdks/python/apache_beam/io/fileio.py 
b/sdks/python/apache_beam/io/fileio.py
index 31b6a93..f532077 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -255,6 +255,20 @@ class ChannelFactory(object):
 else:
   return glob.glob(path)
 
+  @staticmethod
+  def size_in_bytes(path):
+"""Returns the size of a file in bytes.
+
+Args:
+  path: a string that gives the path of a single file.
+"""
+if path.startswith('gs://'):
+  # pylint: disable=wrong-import-order, wrong-import-position
+  from apache_beam.io import gcsio
+  return gcsio.GcsIO().size(path)
+else:
+  return os.path.getsize(path)
+
 
 class _CompressionType(object):
   """Object representing single compression type."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio.py
--
diff --git a/sdks/python/apache_beam/io/gcsio.py 
b/sdks/python/apache_beam/io/gcsio.py
index c61f251..10409c9 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -236,6 +236,21 @@ class GcsIO(object):
 except IOError:
   return False
 
+  @retry.with_exponential_backoff(
+  retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def size(self, path):
+"""Returns the size of a single GCS object.
+
+This method does not perform glob expansion. Hence the given path must be
+for a single GCS object.
+
+Returns: size of the GCS object in bytes.
+"""
+bucket, object_path = parse_gcs_path(path)
+request = storage.StorageObjectsGetRequest(bucket=bucket,
+   object=object_path)
+return self.client.objects.Get(request).size
+
 
 class GcsBufferedReader(object):
   """A class for reading Google Cloud Storage files."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio_test.py

[GitHub] incubator-beam pull request #657: Accept runners by fully qualified name.

2016-07-14 Thread robertwb
Github user robertwb closed the pull request at:

https://github.com/apache/incubator-beam/pull/657


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


  1   2   3   4   5   >