incubator-beam-site git commit: Remove duplicate paragraph.
Repository: incubator-beam-site Updated Branches: refs/heads/asf-site 87d9a07fe -> edd0705b8 Remove duplicate paragraph. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/edd0705b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/edd0705b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/edd0705b Branch: refs/heads/asf-site Commit: edd0705b82799a8e000c995da9c099023f2ccaa8 Parents: 87d9a07 Author: Robert BradshawAuthored: Fri May 27 11:18:51 2016 -0700 Committer: Robert Bradshaw Committed: Fri May 27 11:22:21 2016 -0700 -- _posts/2016-05-20-where-is-my-pcollection-dot-map.md | 2 -- content/blog/2016/05/27/where-is-my-pcollection-dot-map.html | 2 -- 2 files changed, 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/edd0705b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md -- diff --git a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md index f7ea468..e9232d3 100644 --- a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md +++ b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md @@ -15,8 +15,6 @@ Though Beam is relatively new, its design draws heavily on many years of experie The original FlumeJava API has methods like `count` and `parallelDo` on the PCollections. Though slightly more succinct, this approach has many disadvantages to extensibility. Every new user to FlumeJava wanted to add transforms, and adding them as methods to PCollection simply doesn't scale well. In contrast, a PCollection in Beam has a single `apply` method which takes any PTransform as an argument. -Have you ever wondered why Beam has PTransforms for everything instead of having methods on PCollection? Take a look at the history that led to this (and other) design decisions. - FlumeJava http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/edd0705b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html -- diff --git a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html index 9d69ac3..2af12f5 100644 --- a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html +++ b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html @@ -112,8 +112,6 @@ The original FlumeJava API has methods like count and parallelDo on the PCollections. Though slightly more succinct, this approach has many disadvantages to extensibility. Every new user to FlumeJava wanted to add transforms, and adding them as methods to PCollection simply doesnât scale well. In contrast, a PCollection in Beam has a single apply method which takes any PTransform as an argument. -Have you ever wondered why Beam has PTransforms for everything instead of having methods on PCollection? Take a look at the history that led to this (and other) design decisions. - FlumeJava
incubator-beam-site git commit: Fix blog post typo.
Repository: incubator-beam-site Updated Branches: refs/heads/asf-site edd0705b8 -> 0c5c6471b Fix blog post typo. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/commit/0c5c6471 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/tree/0c5c6471 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam-site/diff/0c5c6471 Branch: refs/heads/asf-site Commit: 0c5c6471b8c6e632d3270e132f1514560319a92d Parents: edd0705 Author: Robert BradshawAuthored: Fri May 27 13:56:58 2016 -0700 Committer: Robert Bradshaw Committed: Fri May 27 13:56:58 2016 -0700 -- _posts/2016-05-20-where-is-my-pcollection-dot-map.md | 2 +- content/blog/2016/05/27/where-is-my-pcollection-dot-map.html | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/0c5c6471/_posts/2016-05-20-where-is-my-pcollection-dot-map.md -- diff --git a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md index e9232d3..5fc13f0 100644 --- a/_posts/2016-05-20-where-is-my-pcollection-dot-map.md +++ b/_posts/2016-05-20-where-is-my-pcollection-dot-map.md @@ -82,7 +82,7 @@ As pipelines grow and evolve, it is useful to structure your pipeline into modul -Three different visualizations of a simple WordCount pipeline which computes the number of occurrences of every word in a set of text files. The flag view gives the full DAG of all operations performed. The execution view groups operations according to how they're executed, e.g. after performing runner-specific optimizations like function composition. The structured view nests operations according to their grouping in PTransforms. +Three different visualizations of a simple WordCount pipeline which computes the number of occurrences of every word in a set of text files. The flat view gives the full DAG of all operations performed. The execution view groups operations according to how they're executed, e.g. after performing runner-specific optimizations like function composition. The structured view nests operations according to their grouping in PTransforms. ## Summary http://git-wip-us.apache.org/repos/asf/incubator-beam-site/blob/0c5c6471/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html -- diff --git a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html index 2af12f5..9f50244 100644 --- a/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html +++ b/content/blog/2016/05/27/where-is-my-pcollection-dot-map.html @@ -179,7 +179,7 @@ PCollectionO output = input -Three different visualizations of a simple WordCount pipeline which computes the number of occurrences of every word in a set of text files. The flag view gives the full DAG of all operations performed. The execution view groups operations according to how they're executed, e.g. after performing runner-specific optimizations like function composition. The structured view nests operations according to their grouping in PTransforms. +Three different visualizations of a simple WordCount pipeline which computes the number of occurrences of every word in a set of text files. The flat view gives the full DAG of all operations performed. The execution view groups operations according to how they're executed, e.g. after performing runner-specific optimizations like function composition. The structured view nests operations according to their grouping in PTransforms. Summary
[GitHub] incubator-beam pull request: Optimize the Count CombineFn
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/88 Optimize the Count CombineFn Previously the accumulator was stored as a Long. This uses a singleton long[] to avoid the boxing and unboxing on every increment. This required changing the Coder (the format actually remains the same, but we have no way of declaring that) so is not backwards compatible with reload. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam count Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/88.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #88 commit f2bb6e86bfd71f5cda4f574c6bedfaeff172291a Author: Robert Bradshaw <rober...@google.com> Date: 2016-03-29T19:28:13Z Optimize the Count CombineFn Previously the accumulator was stored as a Long. This uses a singleton long[] to avoid the boxing and unboxing on every increment. This required changing the Coder (the format actually remains the same, but we have no way of declaring that) so is not backwards compatible with reload. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/3] incubator-beam git commit: Update docstring.
Update docstring. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/76f3864b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/76f3864b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/76f3864b Branch: refs/heads/python-sdk Commit: 76f3864b61ad9e2ac9a2e81e7a0d5993db5fde6c Parents: 8a7bc71 Author: Charles ChenAuthored: Mon Jul 25 18:36:45 2016 -0700 Committer: Charles Chen Committed: Mon Jul 25 18:36:45 2016 -0700 -- sdks/python/apache_beam/utils/dependency.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/76f3864b/sdks/python/apache_beam/utils/dependency.py -- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py index d8e0036..7d1ae41 100644 --- a/sdks/python/apache_beam/utils/dependency.py +++ b/sdks/python/apache_beam/utils/dependency.py @@ -1,3 +1,4 @@ + # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with @@ -439,7 +440,7 @@ def get_required_container_version(): def get_sdk_name_and_version(): - """Returns the name and version of the SDK reported to Cloud Dataflow.""" + """Returns name and version of SDK reported to Google Cloud Dataflow.""" # TODO(ccy): Make this check cleaner. container_version = get_required_container_version() if container_version == 'beamhead':
[3/3] incubator-beam git commit: Closes #730
Closes #730 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26ff6579 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26ff6579 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26ff6579 Branch: refs/heads/python-sdk Commit: 26ff6579531d598e91b220fa21ec9bd7f9220f78 Parents: 153916f 76f3864 Author: Robert BradshawAuthored: Mon Jul 25 21:11:31 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 25 21:11:31 2016 -0700 -- sdks/python/apache_beam/internal/apiclient.py | 6 +++--- sdks/python/apache_beam/utils/dependency.py | 12 2 files changed, 15 insertions(+), 3 deletions(-) --
[1/3] incubator-beam git commit: Fix SDK name and version sent to the Cloud Dataflow service
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 153916fe9 -> 26ff65795 Fix SDK name and version sent to the Cloud Dataflow service Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8a7bc71d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8a7bc71d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8a7bc71d Branch: refs/heads/python-sdk Commit: 8a7bc71dee1e3f9f17c29e5e558870a6bc5f4880 Parents: 38d9dea Author: Charles ChenAuthored: Mon Jul 25 18:22:08 2016 -0700 Committer: Charles Chen Committed: Mon Jul 25 18:22:08 2016 -0700 -- sdks/python/apache_beam/internal/apiclient.py | 6 +++--- sdks/python/apache_beam/utils/dependency.py | 11 +++ 2 files changed, 14 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a7bc71d/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 363b8e1..137a40b 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -29,7 +29,6 @@ from apitools.base.py import encoding from apitools.base.py import exceptions from apache_beam import utils -from apache_beam import version from apache_beam.internal import pickler from apache_beam.internal.auth import get_service_credentials from apache_beam.internal.json_value import to_json_value @@ -39,6 +38,7 @@ from apache_beam.utils import dependency from apache_beam.utils import names from apache_beam.utils import retry from apache_beam.utils.dependency import get_required_container_version +from apache_beam.utils.dependency import get_sdk_name_and_version from apache_beam.utils.names import PropertyNames from apache_beam.utils.options import GoogleCloudOptions from apache_beam.utils.options import StandardOptions @@ -191,12 +191,12 @@ class Environment(object): self.proto.userAgent = dataflow.Environment.UserAgentValue() self.local = 'localhost' in self.google_cloud_options.dataflow_endpoint -version_string = version.__version__ +sdk_name, version_string = get_sdk_name_and_version() self.proto.userAgent.additionalProperties.extend([ dataflow.Environment.UserAgentValue.AdditionalProperty( key='name', -value=to_json_value('Google Cloud Dataflow SDK for Python')), +value=to_json_value(sdk_name)), dataflow.Environment.UserAgentValue.AdditionalProperty( key='version', value=to_json_value(version_string))]) # Version information. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8a7bc71d/sdks/python/apache_beam/utils/dependency.py -- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py index b809cf2..d8e0036 100644 --- a/sdks/python/apache_beam/utils/dependency.py +++ b/sdks/python/apache_beam/utils/dependency.py @@ -59,6 +59,7 @@ import tempfile from apache_beam import utils +from apache_beam import version as beam_version from apache_beam.internal import pickler from apache_beam.utils import names from apache_beam.utils import processes @@ -437,6 +438,16 @@ def get_required_container_version(): return 'beamhead' +def get_sdk_name_and_version(): + """Returns the name and version of the SDK reported to Cloud Dataflow.""" + # TODO(ccy): Make this check cleaner. + container_version = get_required_container_version() + if container_version == 'beamhead': +return ('Apache Beam SDK for Python', beam_version.__version__) + else: +return ('Google Cloud Dataflow SDK for Python', container_version) + + def _download_pypi_sdk_package(temp_dir): """Downloads SDK package from PyPI and returns path to local path.""" # TODO(silviuc): Handle apache-beam versions when we have official releases.
[1/2] incubator-beam git commit: Made checksum_output optional in bigshuffle.py.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 95a591e05 -> 67a769a9a Made checksum_output optional in bigshuffle.py. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b782ded Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b782ded Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b782ded Branch: refs/heads/python-sdk Commit: 2b782deddefc4ddf437cc4623ccb461332f0fe20 Parents: 95a591e Author: Marian DvorskyAuthored: Mon Jul 11 11:47:01 2016 -0700 Committer: Robert Bradshaw Committed: Tue Jul 12 14:26:26 2016 -0700 -- .../apache_beam/examples/cookbook/bigshuffle.py | 35 ++-- 1 file changed, 18 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2b782ded/sdks/python/apache_beam/examples/cookbook/bigshuffle.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index 8cbaa40..692bd52 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -42,7 +42,6 @@ def run(argv=None): required=True, help='Output file pattern to write results to.') parser.add_argument('--checksum_output', - required=True, help='Checksum output file pattern.') known_args, pipeline_args = parser.parse_known_args(argv) @@ -59,24 +58,26 @@ def run(argv=None): 'format', lambda (key, vals): ['%s%s' % (key, val) for val in vals])) - input_csum = (lines -| beam.Map('input-csum', crc32line) -| beam.CombineGlobally('combine-input-csum', sum) -| beam.Map('hex-format', lambda x: '%x' % x)) - input_csum | beam.io.Write( - 'write-input-csum', - beam.io.TextFileSink(known_args.checksum_output + '-input')) - # Write the output using a "Write" transform that has side effects. output | beam.io.Write('write', beam.io.TextFileSink(known_args.output)) - # Write the output checksum - output_csum = (output - | beam.Map('output-csum', crc32line) - | beam.CombineGlobally('combine-output-csum', sum) - | beam.Map('hex-format-output', lambda x: '%x' % x)) - output_csum | beam.io.Write( - 'write-output-csum', - beam.io.TextFileSink(known_args.checksum_output + '-output')) + + # Optionally write the input and output checksums. + if known_args.checksum_output: +input_csum = (lines + | beam.Map('input-csum', crc32line) + | beam.CombineGlobally('combine-input-csum', sum) + | beam.Map('hex-format', lambda x: '%x' % x)) +input_csum | beam.io.Write( +'write-input-csum', +beam.io.TextFileSink(known_args.checksum_output + '-input')) + +output_csum = (output + | beam.Map('output-csum', crc32line) + | beam.CombineGlobally('combine-output-csum', sum) + | beam.Map('hex-format-output', lambda x: '%x' % x)) +output_csum | beam.io.Write( +'write-output-csum', +beam.io.TextFileSink(known_args.checksum_output + '-output')) # Actually run the pipeline (all operations above are deferred). p.run()
[2/2] incubator-beam git commit: Closes #625
Closes #625 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/67a769a9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/67a769a9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/67a769a9 Branch: refs/heads/python-sdk Commit: 67a769a9a526397a44e01eda16e1941189dc654c Parents: 95a591e 2b782de Author: Robert BradshawAuthored: Tue Jul 12 14:26:27 2016 -0700 Committer: Robert Bradshaw Committed: Tue Jul 12 14:26:27 2016 -0700 -- .../apache_beam/examples/cookbook/bigshuffle.py | 35 ++-- 1 file changed, 18 insertions(+), 17 deletions(-) --
[2/2] incubator-beam git commit: Closes #641
Closes #641 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e8c39c79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e8c39c79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e8c39c79 Branch: refs/heads/python-sdk Commit: e8c39c798e00444d5b1f6e53c3743fc41ac57b99 Parents: 67a769a f858ea9 Author: Robert BradshawAuthored: Thu Jul 14 10:17:10 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:17:10 2016 -0700 -- sdks/python/apache_beam/examples/cookbook/bigshuffle.py | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) --
[1/2] incubator-beam git commit: Add type hints to bigshuffle to avoid pickle overhead.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 67a769a9a -> e8c39c798 Add type hints to bigshuffle to avoid pickle overhead. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f858ea93 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f858ea93 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f858ea93 Branch: refs/heads/python-sdk Commit: f858ea9335b38c67778f47de63e1d1d16dc79fee Parents: 67a769a Author: Robert BradshawAuthored: Tue Jul 12 13:05:46 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:17:09 2016 -0700 -- sdks/python/apache_beam/examples/cookbook/bigshuffle.py | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f858ea93/sdks/python/apache_beam/examples/cookbook/bigshuffle.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py index 692bd52..0b5da02 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigshuffle.py +++ b/sdks/python/apache_beam/examples/cookbook/bigshuffle.py @@ -48,11 +48,14 @@ def run(argv=None): p = beam.Pipeline(argv=pipeline_args) # Read the text file[pattern] into a PCollection. - lines = p | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) + lines = p | beam.io.Read( + 'read', beam.io.TextFileSource(known_args.input, + coder=beam.coders.BytesCoder())) # Count the occurrences of each word. output = (lines -| beam.Map('split', lambda x: (x[:10], x[10:99])) +| beam.Map('split', lambda x: (x[:10], x[10:99]) + ).with_output_types(beam.typehints.KV[str, str]) | beam.GroupByKey('group') | beam.FlatMap( 'format',
[2/2] incubator-beam git commit: Fix typo in Dataflow runner monitoring message
Fix typo in Dataflow runner monitoring message Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ffbaccaa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ffbaccaa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ffbaccaa Branch: refs/heads/python-sdk Commit: ffbaccaa823149ba76e95e1996f9c7a2ba606d3b Parents: e8c39c7 Author: Charles ChenAuthored: Tue Jul 12 22:00:22 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:18:05 2016 -0700 -- sdks/python/apache_beam/internal/apiclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ffbaccaa/sdks/python/apache_beam/internal/apiclient.py -- diff --git a/sdks/python/apache_beam/internal/apiclient.py b/sdks/python/apache_beam/internal/apiclient.py index 99c7090..363b8e1 100644 --- a/sdks/python/apache_beam/internal/apiclient.py +++ b/sdks/python/apache_beam/internal/apiclient.py @@ -466,7 +466,7 @@ class DataflowApplicationClient(object): # The response is a Job proto with the id for the new job. logging.info('Created job with id: [%s]', response.id) logging.info( -'To accesss the Dataflow monitoring console, please navigate to ' +'To access the Dataflow monitoring console, please navigate to ' 'https://console.developers.google.com/project/%s/dataflow/job/%s', self.google_cloud_options.project, response.id)
[2/2] incubator-beam git commit: Closes #652
Closes #652 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2b9d81fc Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2b9d81fc Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2b9d81fc Branch: refs/heads/python-sdk Commit: 2b9d81fcc87ebd3bfa484c034fe46f79b63beef2 Parents: 52c0d89 cd0178b Author: Robert BradshawAuthored: Thu Jul 14 10:19:06 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:19:06 2016 -0700 -- sdks/python/setup.py | 1 + 1 file changed, 1 insertion(+) --
[1/2] incubator-beam git commit: Added cy_combiners.py to the list of Cythonized Python SDK files.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 52c0d893e -> 2b9d81fcc Added cy_combiners.py to the list of Cythonized Python SDK files. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/cd0178b1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/cd0178b1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/cd0178b1 Branch: refs/heads/python-sdk Commit: cd0178b1a2918a15005bdd485b0a554b1e3e6378 Parents: 52c0d89 Author: Marian DvorskyAuthored: Wed Jul 13 17:16:58 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:19:05 2016 -0700 -- sdks/python/setup.py | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/cd0178b1/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index 789d976..2287b8e 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -75,6 +75,7 @@ setuptools.setup( '**/*.pyx', 'apache_beam/coders/coder_impl.py', 'apache_beam/runners/common.py', +'apache_beam/transforms/cy_combiners.py', 'apache_beam/utils/counters.py', ]), install_requires=REQUIRED_PACKAGES,
[GitHub] incubator-beam pull request #641: Add type hints to bigshuffle to avoid pick...
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/641 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Closes #644
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e8c39c798 -> 52c0d893e Closes #644 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52c0d893 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52c0d893 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52c0d893 Branch: refs/heads/python-sdk Commit: 52c0d893e73d8dae05acc5f5a6f16840ea79404f Parents: e8c39c7 ffbacca Author: Robert BradshawAuthored: Thu Jul 14 10:18:05 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:18:05 2016 -0700 -- sdks/python/apache_beam/internal/apiclient.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Temporarily reverting pickler changes (@4e2d8ab).
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 2b9d81fcc -> 7c0c27ac0 Temporarily reverting pickler changes (@4e2d8ab). It is causing infinite recursion when some tests are directly invoked outside the testing framework. Added one such test to tox tests for testing failing case (directly by main function). Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/245facdb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/245facdb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/245facdb Branch: refs/heads/python-sdk Commit: 245facdbc166298cdb96521165a7514653fa8e57 Parents: 2b9d81f Author: Ahmet AltayAuthored: Wed Jul 13 10:18:03 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:20:29 2016 -0700 -- sdks/python/apache_beam/internal/pickler.py | 18 -- sdks/python/tox.ini | 1 + 2 files changed, 1 insertion(+), 18 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/245facdb/sdks/python/apache_beam/internal/pickler.py -- diff --git a/sdks/python/apache_beam/internal/pickler.py b/sdks/python/apache_beam/internal/pickler.py index f427aa5..898e04b 100644 --- a/sdks/python/apache_beam/internal/pickler.py +++ b/sdks/python/apache_beam/internal/pickler.py @@ -159,24 +159,6 @@ if 'save_module' in dir(dill.dill): return old_save_module_dict(pickler, obj) dill.dill.save_module_dict = new_save_module_dict - - old_save_function = dill.dill.save_function - - @dill.dill.register(types.FunctionType) - def new_save_function(pickler, obj): -globs = obj.__globals__ if dill.dill.PY3 else obj.func_globals -if (dill.dill.is_dill(pickler) and globs == pickler._main.__dict__ -and not pickler._recurse): - try: -pickler._recurse = True -return old_save_function(pickler, obj) - finally: -pickler._recurse = False -else: - return old_save_function(pickler, obj) - dill.dill.save_function = new_save_function - - def _nest_dill_logging(): """Prefix all dill logging with its depth in the callstack. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/245facdb/sdks/python/tox.ini -- diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini index 29674ed..5a2572e 100644 --- a/sdks/python/tox.ini +++ b/sdks/python/tox.ini @@ -31,6 +31,7 @@ deps= pep8 pylint commands = + python apache_beam/examples/complete/autocomplete_test.py python setup.py test {toxinidir}/run_pylint.sh passenv = TRAVIS*
[2/2] incubator-beam git commit: Closes #645
Closes #645 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c0c27ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c0c27ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c0c27ac Branch: refs/heads/python-sdk Commit: 7c0c27ac00fd0ab6b8c2b982540e644485efc60e Parents: 2b9d81f 245facd Author: Robert BradshawAuthored: Thu Jul 14 10:20:59 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 10:20:59 2016 -0700 -- sdks/python/apache_beam/internal/pickler.py | 18 -- sdks/python/tox.ini | 1 + 2 files changed, 1 insertion(+), 18 deletions(-) --
[1/2] incubator-beam git commit: DoOutputsTuple cleanup
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 7c0c27ac0 -> 762a2930a DoOutputsTuple cleanup This avoids the same (logical) PCollection from being re-created with a different producer. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e2eb3b52 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e2eb3b52 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e2eb3b52 Branch: refs/heads/python-sdk Commit: e2eb3b520ce742bbf4aef63046ed2dfdb96c32e5 Parents: 7c0c27a Author: Robert BradshawAuthored: Thu Jul 14 11:03:41 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 11:03:41 2016 -0700 -- sdks/python/apache_beam/pvalue.py | 28 1 file changed, 16 insertions(+), 12 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e2eb3b52/sdks/python/apache_beam/pvalue.py -- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 8552a45..6fc3041 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -58,15 +58,15 @@ class PValue(object): self.producer = None def __str__(self): -return '<%s>' % self._str_internal() +return self._str_internal() def __repr__(self): return '<%s at %s>' % (self._str_internal(), hex(id(self))) def _str_internal(self): -return '%s transform=%s' % ( -self.__class__.__name__, -self.producer.transform if self.producer else 'n/a') +return "%s[%s.%s]" % (self.__class__.__name__, + self.producer.full_label if self.producer else None, + self.tag) def apply(self, *args, **kwargs): """Applies a transform or callable to a PValue. @@ -191,16 +191,20 @@ class DoOutputsTuple(object): # Check if we accessed this tag before. if tag in self._pcolls: return self._pcolls[tag] + if tag is not None: self._transform.side_output_tags.add(tag) -pcoll = PCollection(self._pipeline, tag=tag) -# Transfer the producer from the DoOutputsTuple to the resulting -# PCollection. -pcoll.producer = self.producer -# Add this as an output to both the inner ParDo and the outer _MultiParDo -# PTransforms. -self.producer.parts[0].add_output(pcoll, tag) -self.producer.add_output(pcoll, tag) + pcoll = PCollection(self._pipeline, tag=tag) + # Transfer the producer from the DoOutputsTuple to the resulting + # PCollection. + pcoll.producer = self.producer + # Add this as an output to both the inner ParDo and the outer _MultiParDo + # PTransforms. + self.producer.parts[0].add_output(pcoll, tag) + self.producer.add_output(pcoll, tag) +else: + # Main output is output of inner ParDo. + pcoll = self.producer.parts[0].outputs[0] self._pcolls[tag] = pcoll return pcoll
[2/2] incubator-beam git commit: Closes #658
Closes #658 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/762a2930 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/762a2930 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/762a2930 Branch: refs/heads/python-sdk Commit: 762a2930a093c19580d9f631b923c126aceccc02 Parents: 7c0c27a e2eb3b5 Author: Robert BradshawAuthored: Thu Jul 14 11:43:40 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 11:43:40 2016 -0700 -- sdks/python/apache_beam/pvalue.py | 28 1 file changed, 16 insertions(+), 12 deletions(-) --
[GitHub] incubator-beam pull request #658: DoOutputsTuple cleanup
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/658 DoOutputsTuple cleanup Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- This avoids the same (logical) PCollection from being re-created with a different producer. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam runner-pvalues Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/658.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #658 commit e2eb3b520ce742bbf4aef63046ed2dfdb96c32e5 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-14T18:03:41Z DoOutputsTuple cleanup This avoids the same (logical) PCollection from being re-created with a different producer. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #657: Accept runners by fully qualified name.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/657 Accept runners by fully qualified name. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam runner Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/657.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #657 commit 683262903817e012efbc649d8fe73532876b7c5b Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-14T17:53:09Z Accept runners by fully qualified name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[3/3] incubator-beam git commit: Closes #657
Closes #657 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3b695068 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3b695068 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3b695068 Branch: refs/heads/python-sdk Commit: 3b69506897db7f21cc09eaa5ac8c08e4ee15ad2c Parents: 762a293 c055e84 Author: Robert BradshawAuthored: Thu Jul 14 14:16:09 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 14:16:09 2016 -0700 -- .../apache_beam/runners/dataflow_runner.py | 4 ++ sdks/python/apache_beam/runners/runner.py | 39 ++-- sdks/python/apache_beam/runners/runner_test.py | 2 +- 3 files changed, 24 insertions(+), 21 deletions(-) --
[1/3] incubator-beam git commit: Accept runners by fully qualified name.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 762a2930a -> 3b6950689 Accept runners by fully qualified name. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fe8954 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fe8954 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fe8954 Branch: refs/heads/python-sdk Commit: 84fe8954669ef9a30448d140b5a578c14b863819 Parents: 762a293 Author: Robert BradshawAuthored: Thu Jul 14 10:53:09 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 14:15:59 2016 -0700 -- sdks/python/apache_beam/runners/runner.py | 28 ++--- sdks/python/apache_beam/runners/runner_test.py | 2 +- 2 files changed, 14 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner.py -- diff --git a/sdks/python/apache_beam/runners/runner.py b/sdks/python/apache_beam/runners/runner.py index 55b63f3..98f9758 100644 --- a/sdks/python/apache_beam/runners/runner.py +++ b/sdks/python/apache_beam/runners/runner.py @@ -41,26 +41,24 @@ def create_runner(runner_name): RuntimeError: if an invalid runner name is used. """ # pylint: disable=wrong-import-order, wrong-import-position - if runner_name == 'DirectPipelineRunner': -import apache_beam.runners.direct_runner -return apache_beam.runners.direct_runner.DirectPipelineRunner() - if runner_name == 'DiskCachedPipelineRunner': -import apache_beam.runners.direct_runner -return apache_beam.runners.direct_runner.DiskCachedPipelineRunner( -) - if runner_name == 'EagerPipelineRunner': -import apache_beam.runners.direct_runner -return apache_beam.runners.direct_runner.EagerPipelineRunner() - elif runner_name in ('DataflowPipelineRunner', - 'BlockingDataflowPipelineRunner'): + if runner_name in ('DirectPipelineRunner', 'DiskCachedPipelineRunner', + 'EagerPipelineRunner'): +runner_name = 'apache_beam.runners.direct_runner.' + runner_name + + if runner_name in ('DataflowPipelineRunner', + 'BlockingDataflowPipelineRunner'): import apache_beam.runners.dataflow_runner return apache_beam.runners.dataflow_runner.DataflowPipelineRunner( blocking=runner_name == 'BlockingDataflowPipelineRunner') + elif '.' in runner_name: +module, runner = runner_name.rsplit('.', 1) +return getattr(__import__(module, {}, {}, [runner], -1), runner)() else: -raise RuntimeError( +raise ValueError( 'Unexpected pipeline runner: %s. Valid values are ' -'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, or ' -'BlockingDataflowPipelineRunner.' % runner_name) +'DirectPipelineRunner, DataflowPipelineRunner, EagerPipelineRunner, ' +'BlockingDataflowPipelineRunner or the fully qualified name of ' +'a PipelineRunner subclass.' % runner_name) class PipelineRunner(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fe8954/sdks/python/apache_beam/runners/runner_test.py -- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index 20a7259..d2e70d7 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -44,7 +44,7 @@ class RunnerTest(unittest.TestCase): self.assertTrue( isinstance(create_runner('BlockingDataflowPipelineRunner'), DataflowPipelineRunner)) -self.assertRaises(RuntimeError, create_runner, 'xyz') +self.assertRaises(ValueError, create_runner, 'xyz') def test_remote_runner_translation(self): remote_runner = DataflowPipelineRunner()
[1/2] incubator-beam git commit: Closes #653
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 3b6950689 -> c8cef2cba Closes #653 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c8cef2cb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c8cef2cb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c8cef2cb Branch: refs/heads/python-sdk Commit: c8cef2cbac10b2a3be79b7087c96e254ce7aedd3 Parents: 3b69506 afc68bc Author: Robert BradshawAuthored: Thu Jul 14 14:18:43 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 14:18:43 2016 -0700 -- sdks/python/apache_beam/coders/coders_test_common.py | 4 ++-- .../apache_beam/examples/complete/top_wikipedia_sessions.py| 6 +++--- sdks/python/apache_beam/transforms/timeutil.py | 5 ++--- 3 files changed, 7 insertions(+), 8 deletions(-) --
[2/2] incubator-beam git commit: Fix min and max timestamp on 32-bit machines
Fix min and max timestamp on 32-bit machines Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/afc68bc3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/afc68bc3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/afc68bc3 Branch: refs/heads/python-sdk Commit: afc68bc3aee9cafbe865628a9683bffdd73853ec Parents: 3b69506 Author: Charles ChenAuthored: Thu Jul 14 00:39:01 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 14 14:18:43 2016 -0700 -- sdks/python/apache_beam/coders/coders_test_common.py | 4 ++-- .../apache_beam/examples/complete/top_wikipedia_sessions.py| 6 +++--- sdks/python/apache_beam/transforms/timeutil.py | 5 ++--- 3 files changed, 7 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/afc68bc3/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 07436cb..0266fdc 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -19,7 +19,6 @@ import logging import math -import sys import unittest import dill @@ -121,9 +120,10 @@ class CodersTest(unittest.TestCase): # Multi-byte encoding starts at 128 self.check_coder(coders.VarIntCoder(), *range(120, 140)) # Large values +MAX_64_BIT_INT = 0x7fff self.check_coder(coders.VarIntCoder(), *[int(math.pow(-1, k) * math.exp(k)) - for k in range(0, int(math.log(sys.maxint)))]) + for k in range(0, int(math.log(MAX_64_BIT_INT)))]) def test_float_coder(self): self.check_coder(coders.FloatCoder(), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/afc68bc3/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py -- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 55b7857..7337910 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py @@ -42,7 +42,6 @@ from __future__ import absolute_import import argparse import json import logging -import sys import apache_beam as beam from apache_beam import combiners @@ -50,6 +49,7 @@ from apache_beam import window ONE_HOUR_IN_SECONDS = 3600 THIRTY_DAYS_IN_SECONDS = 30 * 24 * ONE_HOUR_IN_SECONDS +MAX_TIMESTAMP = 0x7fff class ExtractUserAndTimestampDoFn(beam.DoFn): @@ -128,8 +128,8 @@ class ComputeTopSessions(beam.PTransform): return (pcoll | beam.ParDo('ExtractUserAndTimestamp', ExtractUserAndTimestampDoFn()) -| beam.Filter( -lambda x: abs(hash(x)) <= sys.maxint * self.sampling_threshold) +| beam.Filter(lambda x: (abs(hash(x)) <= + MAX_TIMESTAMP * self.sampling_threshold)) | ComputeSessions() | beam.ParDo('SessionsToStrings', SessionsToStringsDoFn()) | TopPerMonth() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/afc68bc3/sdks/python/apache_beam/transforms/timeutil.py -- diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py index f72a9e4..4092b60 100644 --- a/sdks/python/apache_beam/transforms/timeutil.py +++ b/sdks/python/apache_beam/transforms/timeutil.py @@ -23,7 +23,6 @@ from abc import ABCMeta from abc import abstractmethod import datetime -import sys class Timestamp(object): @@ -115,8 +114,8 @@ class Timestamp(object): return Duration(micros=self.micros % other.micros) -MIN_TIMESTAMP = Timestamp(micros=-sys.maxint - 1) -MAX_TIMESTAMP = Timestamp(micros=sys.maxint) +MIN_TIMESTAMP = Timestamp(micros=-0x7fff - 1) +MAX_TIMESTAMP = Timestamp(micros=0x7fff) class Duration(object):
[1/2] incubator-beam git commit: Update some of the example tests to use assert_that
Repository: incubator-beam Updated Branches: refs/heads/python-sdk c8cef2cba -> a1a51c3c1 Update some of the example tests to use assert_that Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/84fef464 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/84fef464 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/84fef464 Branch: refs/heads/python-sdk Commit: 84fef464b0abea41e318c0fe983ac43874e5f6ad Parents: c8cef2c Author: Ahmet Altay <al...@google.com> Authored: Wed Jul 13 16:28:46 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Jul 14 17:37:02 2016 -0700 -- .../examples/complete/autocomplete_test.py | 34 ++--- .../examples/complete/estimate_pi.py| 19 +++-- .../examples/complete/estimate_pi_test.py | 36 +- .../examples/cookbook/coders_test.py| 33 +++-- .../examples/cookbook/custom_ptransform.py | 74 ++-- .../examples/cookbook/custom_ptransform_test.py | 41 --- .../examples/cookbook/group_with_coder_test.py | 70 -- sdks/python/apache_beam/transforms/util.py | 13 8 files changed, 139 insertions(+), 181 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/autocomplete_test.py -- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index bd0a6cb..1b3ee5f 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -15,43 +15,17 @@ # limitations under the License. # -"""Test for the wordcount example.""" +"""Test for the autocomplete example.""" -import collections import unittest - import apache_beam as beam from apache_beam.examples.complete import autocomplete -from apache_beam.pvalue import AsIter - -# TODO(robertwb): Move to testing utilities. - - -def assert_that(pcoll, matcher): - """Asserts that the give PCollection satisfies the constraints of the matcher - in a way that is runnable locally or on a remote service. - """ - singleton = pcoll.pipeline | beam.Create('create_singleton', [None]) - - def check_matcher(_, side_value): -assert matcher(side_value) -return [] - singleton | beam.FlatMap(check_matcher, AsIter(pcoll)) # pylint: disable=expression-not-assigned - - -def contains_in_any_order(expected): - def matcher(value): -vs = collections.Counter(value) -es = collections.Counter(expected) -if vs != es: - raise ValueError( - 'extra: %s, missing: %s' % (vs - es, es - vs)) -return True - return matcher +from apache_beam.transforms.util import assert_that +from apache_beam.transforms.util import contains_in_any_order -class WordCountTest(unittest.TestCase): +class AutocompleteTest(unittest.TestCase): WORDS = ['this', 'this', 'that', 'to', 'to', 'to'] http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/84fef464/sdks/python/apache_beam/examples/complete/estimate_pi.py -- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 8b0f202..3c4a2d9 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -85,6 +85,20 @@ class JsonCoder(object): return json.dumps(x) +class EstimatePiTransform(beam.PTransform): + """Runs 10M trials, and combine the results to estimate pi.""" + + def __init__(self, label): +super(EstimatePiTransform, self).__init__(label) + + def apply(self, pcoll): +# A hundred work items of a hundred thousand tries each. +return (pcoll +| beam.Create('Initialize', [10] * 100).with_output_types(int) +| beam.Map('Run trials', run_trials) +| beam.CombineGlobally('Sum', combine_results).without_defaults()) + + def run(argv=None): parser = argparse.ArgumentParser() @@ -94,11 +108,8 @@ def run(argv=None): known_args, pipeline_args = parser.parse_known_args(argv) p = beam.Pipeline(argv=pipeline_args) - # A thousand work items of a million tries each. (p # pylint: disable=expression-not-assigned - | beam.Create('Initialize', [10] * 100).with_output_types(int) - | beam.Map('Run trials', run_trials) - | beam.CombineGlobally('Sum', combine_results).without_d
[GitHub] incubator-beam pull request #706: Simple DoFnRunner optimizatons.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/706 Simple DoFnRunner optimizatons. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam no-logging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/706.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #706 commit f9da1cde8d4f7828f0d39d04e84e4bdda20093e2 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-21T19:08:33Z Remove expensive per-element-step logging context. This is 3-4% of the total runtime for something that is rarely used. We don't do this in Java either. commit c415c229e22012574be56c53e2df2c46d5c37e08 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-21T19:11:03Z Cache dofn.proces method. Saves another couple percent. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[3/4] incubator-beam git commit: Allow passing logging context directly.
Allow passing logging context directly. This is better than passing a logger module with a specific class. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de35f282 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de35f282 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de35f282 Branch: refs/heads/python-sdk Commit: de35f28294dfb4f47b429635eff843b38279e240 Parents: d20cf64 Author: Robert BradshawAuthored: Sat Jul 23 00:54:11 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 25 11:57:42 2016 -0700 -- sdks/python/apache_beam/runners/common.py | 10 -- 1 file changed, 8 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de35f282/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 9c9ab22..c017704 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -62,7 +62,9 @@ class DoFnRunner(Receiver): context, tagged_receivers, logger=None, - step_name=None): + step_name=None, + # Preferred alternative to logger + logging_context=None): if not args and not kwargs: self.dofn = fn self.dofn_process = fn.process @@ -85,9 +87,13 @@ class DoFnRunner(Receiver): self.window_fn = windowing.windowfn self.context = context self.tagged_receivers = tagged_receivers -self.logging_context = get_logging_context(logger, step_name=step_name) self.step_name = step_name +if logging_context: + self.logging_context = logging_context +else: + self.logging_context = get_logging_context(logger, step_name=step_name) + # Optimize for the common case. self.main_receivers = as_receiver(tagged_receivers[None])
[4/4] incubator-beam git commit: Closes #721
Closes #721 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/153916fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/153916fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/153916fe Branch: refs/heads/python-sdk Commit: 153916fe9f641f5fe5e8d473caef67f449ee6bca Parents: 38d9dea de35f28 Author: Robert BradshawAuthored: Mon Jul 25 12:02:02 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 25 12:02:02 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 30 --- sdks/python/apache_beam/runners/common.py | 72 ++--- 2 files changed, 86 insertions(+), 16 deletions(-) --
[GitHub] incubator-beam pull request #721: Make DoFnRunner a Receiver.
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/721 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #741: Top improvements
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/741 Top improvements Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam top Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/741.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #741 commit 9151fcf861704c550c21f411a9047f955f179ec4 Author: Robert Bradshaw <rober...@gmail.com> Date: 2016-07-27T17:09:49Z Better top implementation. When selecting the top k of n, it is common that k << n. Using a heap is O(n log k) while select algorithms can achieve O(n + k log k). This also avoids the ugliness that heapq does not take the comparator as an argument, resulting in _HeapItem classes that were cumbersome and expensive to serialize. commit e38d6f60f18bf6ad61f2d387c67efa2012af2e14 Author: Robert Bradshaw <rober...@gmail.com> Date: 2016-07-27T19:23:02Z Allow Top operations to take key argument rather than compare, and order by the natural ordering if neither is specified. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #726
Closes #726 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/53ab635c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/53ab635c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/53ab635c Branch: refs/heads/python-sdk Commit: 53ab635c75e4b94b3930a601911a717ddc499efe Parents: 26ff657 2f4054b Author: Robert BradshawAuthored: Thu Jul 28 11:05:56 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 28 11:05:56 2016 -0700 -- sdks/python/apache_beam/io/fileio.py | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) --
[1/2] incubator-beam git commit: Make TextFileReader observable
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 26ff65795 -> 53ab635c7 Make TextFileReader observable This allows future implementation of size tracking for elements in side input sources. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2f4054ba Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2f4054ba Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2f4054ba Branch: refs/heads/python-sdk Commit: 2f4054ba37da1c1100f45a572d96e7a6e2e60152 Parents: 26ff657 Author: Charles ChenAuthored: Mon Jul 25 11:44:22 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 28 11:05:55 2016 -0700 -- sdks/python/apache_beam/io/fileio.py | 6 +- 1 file changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2f4054ba/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 3afaae8..b1e091b 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -745,10 +745,12 @@ class NativeTextFileSink(iobase.NativeSink): # TextFileReader, TextMultiFileReader. -class TextFileReader(iobase.NativeSourceReader): +class TextFileReader(iobase.NativeSourceReader, + coders.observable.ObservableMixin): """A reader for a text file source.""" def __init__(self, source): +super(TextFileReader, self).__init__() self.source = source self.start_offset = self.source.start_offset or 0 self.end_offset = self.source.end_offset @@ -778,6 +780,7 @@ class TextFileReader(iobase.NativeSourceReader): self._file.seek(self.start_offset - 1) self.current_offset -= 1 line = self._file.readline() + self.notify_observers(line, is_encoded=True) self.current_offset += len(line) else: self._file.seek(self.start_offset) @@ -801,6 +804,7 @@ class TextFileReader(iobase.NativeSourceReader): # a dynamic split request from the service. return line = self._file.readline() + self.notify_observers(line, is_encoded=True) self.current_offset += len(line) if self.source.strip_trailing_newlines: line = line.rstrip('\n')
[3/3] incubator-beam git commit: Closes #741
Closes #741 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b4716d9d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b4716d9d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b4716d9d Branch: refs/heads/python-sdk Commit: b4716d9dcf9c3f198fc72e181adc0bae8b6fa818 Parents: 53ab635 866a09d Author: Robert BradshawAuthored: Thu Jul 28 11:08:06 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 28 11:08:06 2016 -0700 -- sdks/python/apache_beam/transforms/combiners.py | 201 --- .../apache_beam/transforms/combiners_test.py| 42 +++- 2 files changed, 164 insertions(+), 79 deletions(-) --
[1/3] incubator-beam git commit: Better top implementation.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 53ab635c7 -> b4716d9dc Better top implementation. When selecting the top k of n, it is common that k << n. Using a heap is O(n log k) while select algorithms can achieve O(n + k log k). This also avoids the ugliness that heapq does not take the comparator as an argument, resulting in _HeapItem classes that were cumbersome and expensive to serialize. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/adb3ed93 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/adb3ed93 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/adb3ed93 Branch: refs/heads/python-sdk Commit: adb3ed93053c83b4e28e7baa879e9aee82f02785 Parents: 53ab635 Author: Robert Bradshaw <rober...@gmail.com> Authored: Wed Jul 27 10:09:49 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Jul 28 11:08:05 2016 -0700 -- sdks/python/apache_beam/transforms/combiners.py | 111 +-- 1 file changed, 51 insertions(+), 60 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/adb3ed93/sdks/python/apache_beam/transforms/combiners.py -- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 8c56e5a..453c0f8 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -228,76 +228,67 @@ class TopCombineFn(core.CombineFn): apply call become additional arguments to the comparator. """ - # Actually pickling the comparison operators (including, often, their - # entire globals) can be very expensive. Instead refer to them by index - # in this dictionary, which is populated on construction (including - # unpickling). - compare_by_id = {} - - def __init__(self, n, compare, _compare_id=None): # pylint: disable=invalid-name + # TODO(robertwb): Allow taking a key rather than a compare. + def __init__(self, n, compare): self._n = n +self._buffer_size = min(2 * n, n + 1000) self._compare = compare -self._compare_id = _compare_id or id(compare) -TopCombineFn.compare_by_id[self._compare_id] = self._compare - - def __reduce_ex__(self, _): -return TopCombineFn, (self._n, self._compare, self._compare_id) - class _HeapItem(object): -"""A wrapper for values supporting arbitrary comparisons. - -The heap implementation supplied by Python is a min heap that always uses -the __lt__ operator if one is available. This wrapper overloads __lt__, -letting us specify arbitrary precedence for elements in the PCollection. -""" + def create_accumulator(self, *args, **kwargs): +return None, [] -def __init__(self, item, compare_id, *args, **kwargs): - # item: wrapped item. - # compare: an implementation of the pairwise < operator. - # args, kwargs: extra arguments supplied to the compare function. - self.item = item - self.compare_id = compare_id - self.args = args - self.kwargs = kwargs + def add_input(self, accumulator, element, *args, **kwargs): +if args or kwargs: + lt = lambda a, b: self._compare(a, b, *args, **kwargs) +else: + lt = self._compare -def __lt__(self, other): - return TopCombineFn.compare_by_id[self.compare_id]( - self.item, other.item, *self.args, **self.kwargs) +threshold, buffer = accumulator +if len(buffer) < self._n: + if not buffer: +return element, [element] + else: +buffer.append(element) +if lt(element, threshold): # element < threshold + return element, buffer +else: + return accumulator # with mutated buffer +elif lt(threshold, element): # threshold < element + buffer.append(element) + if len(buffer) < self._buffer_size: +return accumulator + else: +buffer.sort(cmp=lambda a, b: (not lt(a, b)) - (not lt(b, a))) +return buffer[-self._n], buffer[-self._n:] +else: + return accumulator - def create_accumulator(self, *args, **kwargs): -return [] # Empty heap. - - def add_input(self, heap, element, *args, **kwargs): -# Note that because heap is a min heap, heappushpop will discard incoming -# elements that are lesser (according to compare) than those in the heap -# (since that's what you would get if you pushed a small element on and -# popped the smallest element off). So, filtering a collection with a -# min-heap gives you the largest elements in the collection. -item = self._HeapItem(element, self._compare_i
[1/2] incubator-beam git commit: Optimize Map and Flatmap when there are no side inputs.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk b4716d9dc -> 351c3831d Optimize Map and Flatmap when there are no side inputs. varargs and kwargs are expensive, even when they're empty. This is especially true for otherwise one-argument Python calls which are special cased in CPython. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7d2fb1f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7d2fb1f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7d2fb1f8 Branch: refs/heads/python-sdk Commit: 7d2fb1f88d1a2370dd4053f3a1738cbb9838cc2f Parents: b4716d9 Author: Robert Bradshaw <rober...@google.com> Authored: Wed Jul 27 18:29:59 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Jul 28 11:09:48 2016 -0700 -- sdks/python/apache_beam/transforms/core.py | 40 ++--- 1 file changed, 30 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7d2fb1f8/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 5e6aafc..38b9cd2 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -20,6 +20,8 @@ from __future__ import absolute_import import copy +import inspect +import types from apache_beam import pvalue from apache_beam import typehints @@ -194,6 +196,16 @@ class DoFn(WithTypeHints): return type_hint +def _fn_takes_side_inputs(fn): + try: +argspec = inspect.getargspec(fn) + except TypeError: +# We can't tell; maybe it does. +return True + is_bound = isinstance(fn, types.MethodType) and fn.im_self is not None + return len(argspec.args) > 1 + is_bound or argspec.varargs or argspec.keywords + + class CallableWrapperDoFn(DoFn): """A DoFn (function) object wrapping a callable object. @@ -214,6 +226,11 @@ class CallableWrapperDoFn(DoFn): raise TypeError('Expected a callable object instead of: %r' % fn) self._fn = fn +if _fn_takes_side_inputs(fn): + self.process = lambda context, *args, **kwargs: fn( + context.element, *args, **kwargs) +else: + self.process = lambda context: fn(context.element) super(CallableWrapperDoFn, self).__init__() @@ -237,9 +254,6 @@ class CallableWrapperDoFn(DoFn): return self._strip_output_annotations( trivial_inference.infer_return_type(self._fn, [input_type])) - def process(self, context, *args, **kwargs): -return self._fn(context.element, *args, **kwargs) - def process_argspec_fn(self): return getattr(self._fn, '_argspec_fn', self._fn) @@ -676,7 +690,10 @@ def Map(fn_or_label, *args, **kwargs): # pylint: disable=invalid-name 'Map can be used only with callable objects. ' 'Received %r instead for %s argument.' % (fn, 'first' if label is None else 'second')) - wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] + if _fn_takes_side_inputs(fn): +wrapper = lambda x, *args, **kwargs: [fn(x, *args, **kwargs)] + else: +wrapper = lambda x: [fn(x)] # Proxy the type-hint information from the original function to this new # wrapped function. @@ -1008,21 +1025,24 @@ class GroupByKey(PTransform): value_type = windowed_value_iter_type.inner_type.inner_type return Iterable[KV[key_type, Iterable[value_type]]] -def process(self, context): - k, vs = context.element +def start_bundle(self, context): # pylint: disable=wrong-import-order, wrong-import-position from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.transforms.trigger import create_trigger_driver # pylint: enable=wrong-import-order, wrong-import-position - driver = create_trigger_driver(self.windowing, True) - state = InMemoryUnmergedState() + self.driver = create_trigger_driver(self.windowing, True) + self.state_type = InMemoryUnmergedState + +def process(self, context): + k, vs = context.element + state = self.state_type() # TODO(robertwb): Conditionally process in smaller chunks. - for wvalue in driver.process_elements(state, vs, MIN_TIMESTAMP): + for wvalue in self.driver.process_elements(state, vs, MIN_TIMESTAMP): yield wvalue.with_value((k, wvalue.value)) while state.timers: fired = state.get_and_clear_timers() for timer_window, (name, time_domain, fire_time) in fired: - for wvalue in driver.process_timer( + for wvalue in self.driver.process_timer( timer_window, na
[GitHub] incubator-beam pull request #748: Optimize Map and Flatmap when there are no...
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/748 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #741: Top improvements
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/741 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #748
Closes #748 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/351c3831 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/351c3831 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/351c3831 Branch: refs/heads/python-sdk Commit: 351c3831de1bdcfcb19b2f24f9f0b6a19e77e421 Parents: b4716d9 7d2fb1f Author: Robert BradshawAuthored: Thu Jul 28 11:09:49 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 28 11:09:49 2016 -0700 -- sdks/python/apache_beam/transforms/core.py | 40 ++--- 1 file changed, 30 insertions(+), 10 deletions(-) --
[04/12] incubator-beam git commit: Fix multi-input named PTransforms.
Fix multi-input named PTransforms. Now delegate the __ror__ logic entirely for the naming wrapper. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/937cf69e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/937cf69e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/937cf69e Branch: refs/heads/python-sdk Commit: 937cf69e958d4a82fb274f311de248930298db69 Parents: 9fe102a Author: Robert BradshawAuthored: Fri Jul 22 14:32:33 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:45 2016 -0700 -- sdks/python/apache_beam/transforms/ptransform.py | 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/937cf69e/sdks/python/apache_beam/transforms/ptransform.py -- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index da8b671..b652bca 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -400,7 +400,7 @@ class PTransform(WithTypeHints): else: return NotImplemented - def __ror__(self, left): + def __ror__(self, left, label=None): """Used to apply this PTransform to non-PValues, e.g., a tuple.""" pvalueish, pvalues = self._extract_input_pvalues(left) pipelines = [v.pipeline for v in pvalues if isinstance(v, pvalue.PValue)] @@ -434,7 +434,7 @@ class PTransform(WithTypeHints): if not isinstance(v, pvalue.PValue) and v is not None} pvalueish = _SetInputPValues().visit(pvalueish, replacements) self.pipeline = p -result = p.apply(self, pvalueish) +result = p.apply(self, pvalueish, label) if deferred: return result else: @@ -720,5 +720,8 @@ class _NamedPTransform(PTransform): super(_NamedPTransform, self).__init__(label) self.transform = transform + def __ror__(self, pvalueish): +return self.transform.__ror__(pvalueish, self.label) + def apply(self, pvalue): raise RuntimeError("Should never be applied directly.")
[11/12] incubator-beam git commit: Final cleanup pass.
Final cleanup pass. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e3c078fe Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e3c078fe Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e3c078fe Branch: refs/heads/python-sdk Commit: e3c078fe28553b7e7317316b6df51b4c570573ba Parents: c5b5b14 Author: Robert BradshawAuthored: Sat Jul 23 01:32:23 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:46 2016 -0700 -- .../examples/complete/autocomplete.py| 4 ++-- .../examples/complete/autocomplete_test.py | 4 ++-- .../apache_beam/examples/complete/estimate_pi.py | 5 ++--- .../examples/complete/top_wikipedia_sessions.py | 4 ++-- .../complete/top_wikipedia_sessions_test.py | 2 +- .../apache_beam/examples/cookbook/bigshuffle.py | 12 +--- .../examples/cookbook/custom_ptransform.py | 9 + .../examples/cookbook/custom_ptransform_test.py | 2 +- .../apache_beam/examples/cookbook/filters.py | 4 ++-- .../examples/cookbook/group_with_coder.py| 9 + .../examples/cookbook/multiple_output_pardo.py | 14 +++--- .../apache_beam/examples/snippets/snippets.py| 19 +-- .../apache_beam/examples/streaming_wordcap.py| 4 ++-- 13 files changed, 45 insertions(+), 47 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete.py -- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index 10d9009..c3cd88f 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -48,8 +48,8 @@ def run(argv=None): | 'read' >> beam.io.Read(beam.io.TextFileSource(known_args.input)) | 'split' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x)) | 'TopPerPrefix' >> TopPerPrefix(5) - | beam.Map('format', - lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) + | 'format' >> beam.Map( + lambda (prefix, candidates): '%s: %s' % (prefix, candidates)) | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.output))) p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/autocomplete_test.py -- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 18d0511..0d20482 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -31,8 +31,8 @@ class AutocompleteTest(unittest.TestCase): def test_top_prefixes(self): p = beam.Pipeline('DirectPipelineRunner') -words = p | 'create' >> beam.Create(self.WORDS) -result = words | 'test' >> autocomplete.TopPerPrefix(5) +words = p | beam.Create(self.WORDS) +result = words | autocomplete.TopPerPrefix(5) # values must be hashable for now result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) assert_that(result, equal_to( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/estimate_pi.py -- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index c33db1d..37c1aad 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -112,9 +112,8 @@ def run(argv=None): p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned - | 'Estimate' >> EstimatePiTransform() - | beam.io.Write('Write', - beam.io.TextFileSink(known_args.output, + | EstimatePiTransform() + | beam.io.Write(beam.io.TextFileSink(known_args.output, coder=JsonCoder( # Actually run the pipeline (all operations above are deferred). http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e3c078fe/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py -- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions.py index 7468484..a48a383 100644 ---
[09/12] incubator-beam git commit: Fix label-sensitive test.
Fix label-sensitive test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2a59a121 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2a59a121 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2a59a121 Branch: refs/heads/python-sdk Commit: 2a59a121441a003bad949ae6a23d58a9cf2b3059 Parents: 2ff5630 Author: Robert BradshawAuthored: Fri Jul 22 16:24:48 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:46 2016 -0700 -- sdks/python/apache_beam/pipeline.py | 4 sdks/python/apache_beam/transforms/ptransform_test.py | 14 +++--- 2 files changed, 11 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index aeed9f9..0572466 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -348,6 +348,10 @@ class AppliedPTransform(object): # root producer. self.refcounts = collections.defaultdict(int) + def __repr__(self): +return "%s(%s, %s)" % (self.__class__.__name__, self.full_label, + type(self.transform).__name__) + def update_input_refcounts(self): """Increment refcounts for all transforms providing inputs.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2a59a121/sdks/python/apache_beam/transforms/ptransform_test.py -- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 8121c1e..3a71ec3 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -54,28 +54,28 @@ class PTransformTest(unittest.TestCase): pa = Pipeline('DirectPipelineRunner') res = pa | 'a_label' >> beam.Create([1, 2]) -self.assertEqual(' ', - str(res.producer.transform)) +self.assertEqual('AppliedPTransform(a_label, Create)', + str(res.producer)) pc = Pipeline('DirectPipelineRunner') -res = pc | 'with_inputs' >> beam.Create([1, 2]) +res = pc | beam.Create([1, 2]) inputs_tr = res.producer.transform inputs_tr.inputs = ('ci',) self.assertEqual( -""" """, +""" """, str(inputs_tr)) pd = Pipeline('DirectPipelineRunner') -res = pd | 'with_sidei' >> beam.Create([1, 2]) +res = pd | beam.Create([1, 2]) side_tr = res.producer.transform side_tr.side_inputs = (4,) self.assertEqual( -' ', +' ', str(side_tr)) inputs_tr.side_inputs = ('cs',) self.assertEqual( -""" """, str(inputs_tr))
[08/12] incubator-beam git commit: Fixes examples
Fixes examples Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2ff5630b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2ff5630b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2ff5630b Branch: refs/heads/python-sdk Commit: 2ff5630be06034e25d60e105bbe1a718ae06b4d6 Parents: 362f2e9 Author: Chamikara JayalathAuthored: Fri Jul 22 16:04:29 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:46 2016 -0700 -- sdks/python/apache_beam/examples/complete/autocomplete.py | 4 ++-- sdks/python/apache_beam/examples/complete/estimate_pi.py | 3 --- sdks/python/apache_beam/examples/snippets/snippets.py | 8 3 files changed, 6 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/autocomplete.py -- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index b68bc56..10d9009 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -56,8 +56,8 @@ def run(argv=None): class TopPerPrefix(beam.PTransform): - def __init__(self, label, count): -super(TopPerPrefix, self).__init__(label) + def __init__(self, count): +super(TopPerPrefix, self).__init__() self._count = count def apply(self, words): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/complete/estimate_pi.py -- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index ef9f8cc..c33db1d 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -90,9 +90,6 @@ class JsonCoder(object): class EstimatePiTransform(beam.PTransform): """Runs 10M trials, and combine the results to estimate pi.""" - def __init__(self, label): -super(EstimatePiTransform, self).__init__(label) - def apply(self, pcoll): # A hundred work items of a hundred thousand tries each. return (pcoll http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2ff5630b/sdks/python/apache_beam/examples/snippets/snippets.py -- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index c605db8..9d1df82 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -466,7 +466,7 @@ def examples_wordcount_minimal(renames): # [END examples_wordcount_minimal_map] # [START examples_wordcount_minimal_write] - | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink()) + | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) # [END examples_wordcount_minimal_write] ) @@ -531,7 +531,7 @@ def examples_wordcount_wordcount(renames): formatted = counts | beam.ParDo(FormatAsTextFn()) # [END examples_wordcount_wordcount_dofn] - formatted | 'gs://my-bucket/counts.txt' >> beam.io.Write(beam.io.TextFileSink()) + formatted | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt')) p.visit(SnippetUtils.RenameFiles(renames)) p.run() @@ -702,8 +702,8 @@ def model_custom_source(count): # [START model_custom_source_new_ptransform] class ReadFromCountingSource(PTransform): -def __init__(self, label, count, **kwargs): - super(ReadFromCountingSource, self).__init__(label, **kwargs) +def __init__(self, count, **kwargs): + super(ReadFromCountingSource, self).__init__(**kwargs) self._count = count def apply(self, pcoll):
[10/12] incubator-beam git commit: Lint fixes.
Lint fixes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b15d35ca Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b15d35ca Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b15d35ca Branch: refs/heads/python-sdk Commit: b15d35ca6e585e75153e05d96403336889cc6894 Parents: 2a59a12 Author: Robert BradshawAuthored: Fri Jul 22 18:35:22 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:46 2016 -0700 -- sdks/python/apache_beam/dataflow_test.py| 7 +- .../complete/juliaset/juliaset/juliaset.py | 9 +- .../apache_beam/examples/complete/tfidf_test.py | 2 +- .../examples/cookbook/bigquery_side_input.py| 14 +- .../cookbook/bigquery_side_input_test.py| 4 +- .../examples/cookbook/bigquery_tornadoes.py | 6 +- .../apache_beam/examples/cookbook/bigshuffle.py | 5 +- .../apache_beam/examples/cookbook/filters.py| 2 +- .../apache_beam/examples/snippets/snippets.py | 2 +- .../examples/snippets/snippets_test.py | 8 +- sdks/python/apache_beam/examples/wordcount.py | 2 +- .../apache_beam/examples/wordcount_debugging.py | 2 +- .../apache_beam/examples/wordcount_minimal.py | 2 +- sdks/python/apache_beam/io/bigquery.py | 4 +- sdks/python/apache_beam/pipeline_test.py| 4 +- .../apache_beam/transforms/combiners_test.py| 4 +- sdks/python/apache_beam/transforms/core.py | 7 +- .../apache_beam/transforms/ptransform_test.py | 161 ++- sdks/python/apache_beam/transforms/util.py | 3 +- .../typehints/typed_pipeline_test.py| 2 +- 20 files changed, 127 insertions(+), 123 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index bf66851..cc3a526 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -114,8 +114,8 @@ class DataflowTest(unittest.TestCase): words = pipeline | 'SomeWords' >> Create(words_list) prefix = 'zyx' suffix = pipeline | 'SomeString' >> Create(['xyz']) # side in -result = words | 'DecorateWordsDoFn' >> ParDo(SomeDoFn(), prefix, - suffix=AsSingleton(suffix)) +result = words | 'DecorateWordsDoFn' >> ParDo( +SomeDoFn(), prefix, suffix=AsSingleton(suffix)) assert_that(result, equal_to(['zyx-%s-xyz' % x for x in words_list])) pipeline.run() @@ -179,8 +179,7 @@ class DataflowTest(unittest.TestCase): pipeline = Pipeline('DirectPipelineRunner') pcol = pipeline | 'start' >> Create([1, 2]) side = pipeline | 'side' >> Create([]) # 0 values in side input. -result = ( -pcol | 'compute' >> FlatMap(lambda x, s: [x * s], AsSingleton(side, 10))) +result = pcol | FlatMap(lambda x, s: [x * s], AsSingleton(side, 10)) assert_that(result, equal_to([10, 20])) pipeline.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py -- diff --git a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py index 56696c3..1445fbe 100644 --- a/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py +++ b/sdks/python/apache_beam/examples/complete/juliaset/juliaset/juliaset.py @@ -105,11 +105,12 @@ def run(argv=None): # pylint: disable=missing-docstring # Group each coordinate triplet by its x value, then write the coordinates to # the output file with an x-coordinate grouping per line. # pylint: disable=expression-not-assigned - (coordinates | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) - | 'x coord' >> beam.GroupByKey() | beam.Map( - 'format', + (coordinates + | 'x coord key' >> beam.Map(lambda (x, y, i): (x, (x, y, i))) + | 'x coord' >> beam.GroupByKey() + | 'format' >> beam.Map( lambda (k, coords): ' '.join('(%s, %s, %s)' % coord for coord in coords)) - | 'write' >> beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output))) + | beam.io.Write(beam.io.TextFileSink(known_args.coordinate_output))) # pylint: enable=expression-not-assigned p.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/b15d35ca/sdks/python/apache_beam/examples/complete/tfidf_test.py -- diff --git
[03/12] incubator-beam git commit: Move names out of transform constructors.
Move names out of transform constructors. sed -i -r 's/[|] (\S+)[(](["'"'"'][^"'"'"']+.)(, +|([)]))/| \2 >> \1(\4/g' Small number of tests will need to be fixed by hand. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/031c4cce Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/031c4cce Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/031c4cce Branch: refs/heads/python-sdk Commit: 031c4cce0b9eae0d50a49f43ffeced1edbfd2f8f Parents: 937cf69 Author: Robert BradshawAuthored: Fri Jul 22 14:34:58 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:45 2016 -0700 -- sdks/python/apache_beam/dataflow_test.py| 96 ++-- .../examples/complete/autocomplete.py | 8 +- .../examples/complete/autocomplete_test.py | 4 +- .../examples/complete/estimate_pi.py| 8 +- .../examples/complete/estimate_pi_test.py | 2 +- .../complete/juliaset/juliaset/juliaset.py | 8 +- .../apache_beam/examples/complete/tfidf.py | 32 +- .../apache_beam/examples/complete/tfidf_test.py | 2 +- .../examples/complete/top_wikipedia_sessions.py | 8 +- .../complete/top_wikipedia_sessions_test.py | 2 +- .../examples/cookbook/bigquery_schema.py| 4 +- .../examples/cookbook/bigquery_side_input.py| 6 +- .../cookbook/bigquery_side_input_test.py| 8 +- .../examples/cookbook/bigquery_tornadoes.py | 6 +- .../cookbook/bigquery_tornadoes_test.py | 2 +- .../apache_beam/examples/cookbook/bigshuffle.py | 18 +- .../apache_beam/examples/cookbook/coders.py | 2 +- .../examples/cookbook/coders_test.py| 4 +- .../examples/cookbook/custom_ptransform.py | 6 +- .../examples/cookbook/custom_ptransform_test.py | 2 +- .../apache_beam/examples/cookbook/filters.py| 10 +- .../examples/cookbook/filters_test.py | 2 +- .../examples/cookbook/group_with_coder.py | 6 +- .../examples/cookbook/mergecontacts.py | 8 +- .../examples/cookbook/multiple_output_pardo.py | 18 +- .../apache_beam/examples/snippets/snippets.py | 62 +-- .../examples/snippets/snippets_test.py | 10 +- .../apache_beam/examples/streaming_wordcap.py | 2 +- .../apache_beam/examples/streaming_wordcount.py | 8 +- sdks/python/apache_beam/examples/wordcount.py | 14 +- .../apache_beam/examples/wordcount_debugging.py | 16 +- .../apache_beam/examples/wordcount_minimal.py | 14 +- sdks/python/apache_beam/io/avroio.py| 2 +- sdks/python/apache_beam/io/bigquery.py | 4 +- .../apache_beam/io/filebasedsource_test.py | 4 +- sdks/python/apache_beam/io/iobase.py| 4 +- sdks/python/apache_beam/pipeline_test.py| 42 +- sdks/python/apache_beam/pvalue_test.py | 6 +- .../consumer_tracking_pipeline_visitor_test.py | 4 +- sdks/python/apache_beam/runners/runner_test.py | 6 +- .../apache_beam/transforms/combiners_test.py| 48 +- sdks/python/apache_beam/transforms/core.py | 16 +- .../python/apache_beam/transforms/ptransform.py | 2 +- .../apache_beam/transforms/ptransform_test.py | 498 +-- sdks/python/apache_beam/transforms/util.py | 8 +- .../apache_beam/transforms/window_test.py | 14 +- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py| 16 +- 48 files changed, 537 insertions(+), 537 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index 476f8b2..bf66851 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -54,33 +54,33 @@ class DataflowTest(unittest.TestCase): def Count(pcoll): # pylint: disable=invalid-name, no-self-argument """A Count transform: v, ... => (v, n), ...""" return (pcoll -| Map('AddCount', lambda x: (x, 1)) -| GroupByKey('GroupCounts') -| Map('AddCounts', lambda (x, ones): (x, sum(ones +| 'AddCount' >> Map(lambda x: (x, 1)) +| 'GroupCounts' >> GroupByKey() +| 'AddCounts' >> Map(lambda (x, ones): (x, sum(ones def test_word_count(self): pipeline = Pipeline('DirectPipelineRunner') -lines = pipeline | Create('SomeWords', DataflowTest.SAMPLE_DATA) +lines = pipeline | 'SomeWords' >> Create(DataflowTest.SAMPLE_DATA) result = ( -(lines | FlatMap('GetWords', lambda x: re.findall(r'\w+', x))) +(lines
[2/4] incubator-beam git commit: Make save_main_session optional
Make save_main_session optional Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/2e36602c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/2e36602c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/2e36602c Branch: refs/heads/python-sdk Commit: 2e36602c0a2944129698866cc31abc3bcac6169f Parents: af6d380 Author: Silviu CalinoiuAuthored: Sat Jul 23 09:31:30 2016 -0700 Committer: Silviu Calinoiu Committed: Sat Jul 23 09:31:30 2016 -0700 -- .../python/apache_beam/utils/dependency_test.py | 33 sdks/python/apache_beam/utils/options.py| 2 +- 2 files changed, 21 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/2e36602c/sdks/python/apache_beam/utils/dependency_test.py -- diff --git a/sdks/python/apache_beam/utils/dependency_test.py b/sdks/python/apache_beam/utils/dependency_test.py index b7a9296..2f9a57b 100644 --- a/sdks/python/apache_beam/utils/dependency_test.py +++ b/sdks/python/apache_beam/utils/dependency_test.py @@ -81,10 +81,12 @@ class SetupTest(unittest.TestCase): [], dependency.stage_job_resources(options)) - def test_default_resources(self): + def test_with_main_session(self): staging_dir = tempfile.mkdtemp() options = PipelineOptions() + options.view_as(GoogleCloudOptions).staging_location = staging_dir +options.view_as(SetupOptions).save_main_session = True self.update_options(options) self.assertEqual( @@ -94,6 +96,16 @@ class SetupTest(unittest.TestCase): os.path.isfile( os.path.join(staging_dir, names.PICKLED_MAIN_SESSION_FILE))) + def test_default_resources(self): +staging_dir = tempfile.mkdtemp() +options = PipelineOptions() +options.view_as(GoogleCloudOptions).staging_location = staging_dir +self.update_options(options) + +self.assertEqual( +[], +dependency.stage_job_resources(options)) + def test_with_requirements_file(self): staging_dir = tempfile.mkdtemp() source_dir = tempfile.mkdtemp() @@ -106,7 +118,7 @@ class SetupTest(unittest.TestCase): self.create_temp_file( os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') self.assertEqual( -sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE, +sorted([dependency.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']), sorted(dependency.stage_job_resources( options, @@ -145,7 +157,7 @@ class SetupTest(unittest.TestCase): self.create_temp_file( os.path.join(source_dir, dependency.REQUIREMENTS_FILE), 'nothing') self.assertEqual( -sorted([dependency.REQUIREMENTS_FILE, names.PICKLED_MAIN_SESSION_FILE, +sorted([dependency.REQUIREMENTS_FILE, 'abc.txt', 'def.txt']), sorted(dependency.stage_job_resources( options, @@ -169,8 +181,7 @@ class SetupTest(unittest.TestCase): source_dir, 'setup.py') self.assertEqual( -[dependency.WORKFLOW_TARBALL_FILE, - names.PICKLED_MAIN_SESSION_FILE], +[dependency.WORKFLOW_TARBALL_FILE], dependency.stage_job_resources( options, # We replace the build setup command because a realistic one would @@ -265,8 +276,7 @@ class SetupTest(unittest.TestCase): options.view_as(SetupOptions).sdk_location = 'default' self.assertEqual( -[names.PICKLED_MAIN_SESSION_FILE, - names.DATAFLOW_SDK_TARBALL_FILE], +[names.DATAFLOW_SDK_TARBALL_FILE], dependency.stage_job_resources( options, file_copy=dependency._dependency_file_copy)) @@ -286,8 +296,7 @@ class SetupTest(unittest.TestCase): options.view_as(SetupOptions).sdk_location = sdk_location self.assertEqual( -[names.PICKLED_MAIN_SESSION_FILE, - names.DATAFLOW_SDK_TARBALL_FILE], +[names.DATAFLOW_SDK_TARBALL_FILE], dependency.stage_job_resources(options)) tarball_path = os.path.join( staging_dir, names.DATAFLOW_SDK_TARBALL_FILE) @@ -321,8 +330,7 @@ class SetupTest(unittest.TestCase): options.view_as(SetupOptions).sdk_location = sdk_location self.assertEqual( -[names.PICKLED_MAIN_SESSION_FILE, - names.DATAFLOW_SDK_TARBALL_FILE], +[names.DATAFLOW_SDK_TARBALL_FILE], dependency.stage_job_resources(options)) def test_with_extra_packages(self): @@ -363,8 +371,7 @@ class SetupTest(unittest.TestCase): self.assertEqual( ['abc.tar.gz', 'xyz.tar.gz', 'xyz2.tar', 'gcs.tar.gz', -
[4/4] incubator-beam git commit: Closes #723
Closes #723 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9fe102a5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9fe102a5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9fe102a5 Branch: refs/heads/python-sdk Commit: 9fe102a5cf477225108bef7f24df2299c1b15563 Parents: 47042ce 3d64a8c Author: Robert BradshawAuthored: Sat Jul 23 16:42:31 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:42:31 2016 -0700 -- .../examples/complete/autocomplete.py | 9 +++- .../examples/complete/estimate_pi.py| 8 ++- .../apache_beam/examples/complete/tfidf.py | 8 ++- .../examples/complete/top_wikipedia_sessions.py | 10 +++- .../examples/cookbook/bigquery_side_input.py| 8 ++- .../apache_beam/examples/cookbook/bigshuffle.py | 9 +++- .../apache_beam/examples/cookbook/coders.py | 7 +++ .../examples/cookbook/group_with_coder.py | 9 +++- .../examples/cookbook/mergecontacts.py | 9 +++- .../examples/cookbook/multiple_output_pardo.py | 9 +++- sdks/python/apache_beam/examples/wordcount.py | 9 +++- .../apache_beam/examples/wordcount_debugging.py | 9 +++- .../apache_beam/examples/wordcount_minimal.py | 9 +++- .../python/apache_beam/utils/dependency_test.py | 33 +++- sdks/python/apache_beam/utils/options.py| 2 +- sdks/python/setup.py| 56 +++- 16 files changed, 144 insertions(+), 60 deletions(-) --
[1/4] incubator-beam git commit: Refactor setup.py to separate strings/versions
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 47042ce89 -> 9fe102a5c Refactor setup.py to separate strings/versions Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/af6d3804 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/af6d3804 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/af6d3804 Branch: refs/heads/python-sdk Commit: af6d3804157dc445d9133ad5a8fe0717bb1f5786 Parents: 47042ce Author: Silviu CalinoiuAuthored: Sat Jul 23 09:29:06 2016 -0700 Committer: Silviu Calinoiu Committed: Sat Jul 23 09:29:06 2016 -0700 -- sdks/python/setup.py | 56 ++- 1 file changed, 31 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/af6d3804/sdks/python/setup.py -- diff --git a/sdks/python/setup.py b/sdks/python/setup.py index da2df87..b0a5c85 100644 --- a/sdks/python/setup.py +++ b/sdks/python/setup.py @@ -15,13 +15,32 @@ # limitations under the License. # -"""Apache Beam SDK setup configuration.""" +"""Apache Beam SDK for Python setup file.""" import os import platform import setuptools +def get_version(): + global_names = {} + execfile(os.path.normpath('./apache_beam/version.py'), + global_names) + return global_names['__version__'] + +PACKAGE_NAME = 'apache-beam-sdk' +PACKAGE_VERSION = get_version() +PACKAGE_DESCRIPTION = 'Apache Beam SDK for Python' +PACKAGE_URL = 'https://beam.incubator.apache.org' +PACKAGE_DOWNLOAD_URL = 'TBD' +PACKAGE_AUTHOR = 'Apache Software Foundation' +PACKAGE_EMAIL = 'd...@beam.incubator.apache.org' +PACKAGE_KEYWORDS = 'apache beam' +PACKAGE_LONG_DESCRIPTION = ''' +TBD +''' + + # Currently all compiled modules are optional (for performance only). if platform.system() == 'Windows': # Windows doesn't always provide int64_t. @@ -34,27 +53,12 @@ else: cythonize = lambda *args, **kwargs: [] -def get_version(): - global_names = {} - execfile(os.path.normpath('./apache_beam/version.py'), - global_names) - return global_names['__version__'] - - -# Configure the required packages and scripts to install. REQUIRED_PACKAGES = [ 'avro>=1.7.7', 'dill>=0.2.5', 'google-apitools>=0.5.2', -# TODO(silviuc): Reenable api client package dependencies when we can -# update the packages to the latest version without affecting previous -# SDK releases. -# 'google-apitools-bigquery-v2', -# 'google-apitools-dataflow-v1b3>=0.4.20160217', -# 'google-apitools-storage-v1', 'httplib2>=0.8', 'mock>=1.0.1', -'nose>=1.0', 'oauth2client>=2.0.1', 'protorpc>=0.9.1', 'python-gflags>=2.0', @@ -63,14 +67,16 @@ REQUIRED_PACKAGES = [ setuptools.setup( -name='apache-beam-sdk', -version=get_version(), -description='Apache Beam SDK for Python', -long_description='', -url='https://beam.incubator.apache.org', -download_url='TBD', -author='Apache Software Foundation', +name=PACKAGE_NAME, +version=PACKAGE_VERSION, +description=PACKAGE_DESCRIPTION, +long_description=PACKAGE_LONG_DESCRIPTION, +url=PACKAGE_URL, +download_url=PACKAGE_DOWNLOAD_URL, +author=PACKAGE_AUTHOR, +author_email=PACKAGE_EMAIL, packages=setuptools.find_packages(), +package_data={'apache_beam': ['**/*.pyx', '**/*.pxd']}, ext_modules=cythonize([ '**/*.pyx', 'apache_beam/coders/coder_impl.py', @@ -79,8 +85,8 @@ setuptools.setup( 'apache_beam/utils/counters.py', 'apache_beam/utils/windowed_value.py', ]), +setup_requires=['nose>=1.0'], install_requires=REQUIRED_PACKAGES, -package_data={'': ['*.pyx', '*.pxd']}, test_suite='nose.collector', zip_safe=False, # PyPI package information. @@ -93,5 +99,5 @@ setuptools.setup( 'Topic :: Software Development :: Libraries :: Python Modules', ], license='Apache 2.0', -keywords='apache beam', +keywords=PACKAGE_KEYWORDS, )
[3/4] incubator-beam git commit: Refactor examples to use save_main_session
Refactor examples to use save_main_session Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3d64a8c5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3d64a8c5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3d64a8c5 Branch: refs/heads/python-sdk Commit: 3d64a8c5abf3dc9547ab75765cbd3e1ef5fac268 Parents: 2e36602 Author: Silviu CalinoiuAuthored: Sat Jul 23 09:39:59 2016 -0700 Committer: Silviu Calinoiu Committed: Sat Jul 23 09:39:59 2016 -0700 -- sdks/python/apache_beam/examples/complete/autocomplete.py | 9 +++-- sdks/python/apache_beam/examples/complete/estimate_pi.py | 8 +++- sdks/python/apache_beam/examples/complete/tfidf.py| 8 +++- .../examples/complete/top_wikipedia_sessions.py | 10 -- .../apache_beam/examples/cookbook/bigquery_side_input.py | 8 +++- sdks/python/apache_beam/examples/cookbook/bigshuffle.py | 9 +++-- sdks/python/apache_beam/examples/cookbook/coders.py | 7 +++ .../apache_beam/examples/cookbook/group_with_coder.py | 9 +++-- .../python/apache_beam/examples/cookbook/mergecontacts.py | 9 +++-- .../examples/cookbook/multiple_output_pardo.py| 9 +++-- sdks/python/apache_beam/examples/wordcount.py | 9 +++-- sdks/python/apache_beam/examples/wordcount_debugging.py | 9 +++-- sdks/python/apache_beam/examples/wordcount_minimal.py | 9 +++-- 13 files changed, 92 insertions(+), 21 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/autocomplete.py -- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete.py b/sdks/python/apache_beam/examples/complete/autocomplete.py index 2fb2d92..0f1e96e 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete.py @@ -24,6 +24,8 @@ import logging import re import apache_beam as beam +from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.options import SetupOptions def run(argv=None): @@ -36,8 +38,11 @@ def run(argv=None): required=True, help='Output file to write results to.') known_args, pipeline_args = parser.parse_known_args(argv) - - p = beam.Pipeline(argv=pipeline_args) + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = True + p = beam.Pipeline(options=pipeline_options) (p # pylint: disable=expression-not-assigned | beam.io.Read('read', beam.io.TextFileSource(known_args.input)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/estimate_pi.py -- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi.py b/sdks/python/apache_beam/examples/complete/estimate_pi.py index 3c4a2d9..09faecf 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi.py @@ -36,6 +36,8 @@ import apache_beam as beam from apache_beam.typehints import Any from apache_beam.typehints import Iterable from apache_beam.typehints import Tuple +from apache_beam.utils.options import PipelineOptions +from apache_beam.utils.options import SetupOptions @beam.typehints.with_output_types(Tuple[int, int, int]) @@ -106,8 +108,12 @@ def run(argv=None): required=True, help='Output file to write results to.') known_args, pipeline_args = parser.parse_known_args(argv) + # We use the save_main_session option because one or more DoFn's in this + # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) + pipeline_options.view_as(SetupOptions).save_main_session = True + p = beam.Pipeline(options=pipeline_options) - p = beam.Pipeline(argv=pipeline_args) (p # pylint: disable=expression-not-assigned | EstimatePiTransform('Estimate') | beam.io.Write('Write', http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3d64a8c5/sdks/python/apache_beam/examples/complete/tfidf.py -- diff --git a/sdks/python/apache_beam/examples/complete/tfidf.py b/sdks/python/apache_beam/examples/complete/tfidf.py index cd85651..ef58cc0 100644 ---
[02/12] incubator-beam git commit: Move names out of transform constructors.
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/031c4cce/sdks/python/apache_beam/transforms/combiners_test.py -- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index a747112..0439fe1 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -39,16 +39,16 @@ class CombineTest(unittest.TestCase): size = len(vals) # First for global combines. -pcoll = pipeline | Create('start', vals) -result_mean = pcoll | combine.Mean.Globally('mean') -result_count = pcoll | combine.Count.Globally('count') +pcoll = pipeline | 'start' >> Create(vals) +result_mean = pcoll | 'mean' >> combine.Mean.Globally() +result_count = pcoll | 'count' >> combine.Count.Globally() assert_that(result_mean, equal_to([mean]), label='assert:mean') assert_that(result_count, equal_to([size]), label='assert:size') # Again for per-key combines. -pcoll = pipeline | Create('start-perkey', [('a', x) for x in vals]) -result_key_mean = pcoll | combine.Mean.PerKey('mean-perkey') -result_key_count = pcoll | combine.Count.PerKey('count-perkey') +pcoll = pipeline | 'start-perkey' >> Create([('a', x) for x in vals]) +result_key_mean = pcoll | 'mean-perkey' >> combine.Mean.PerKey() +result_key_count = pcoll | 'count-perkey' >> combine.Count.PerKey() assert_that(result_key_mean, equal_to([('a', mean)]), label='key:mean') assert_that(result_key_count, equal_to([('a', size)]), label='key:size') pipeline.run() @@ -66,9 +66,9 @@ class CombineTest(unittest.TestCase): 9: 'nniiinne'} # First for global combines. -pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) -result_top = pcoll | combine.Top.Largest('top', 5) -result_bot = pcoll | combine.Top.Smallest('bot', 4) +pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) +result_top = pcoll | 'top' >> combine.Top.Largest(5) +result_bot = pcoll | 'bot' >> combine.Top.Smallest(4) result_cmp = pcoll | combine.Top.Of( 'cmp', 6, @@ -81,8 +81,8 @@ class CombineTest(unittest.TestCase): # Again for per-key combines. pcoll = pipeline | Create( 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) -result_key_top = pcoll | combine.Top.LargestPerKey('top-perkey', 5) -result_key_bot = pcoll | combine.Top.SmallestPerKey('bot-perkey', 4) +result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5) +result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4) result_key_cmp = pcoll | combine.Top.PerKey( 'cmp-perkey', 6, @@ -99,15 +99,15 @@ class CombineTest(unittest.TestCase): def test_top_shorthands(self): pipeline = Pipeline('DirectPipelineRunner') -pcoll = pipeline | Create('start', [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) -result_top = pcoll | beam.CombineGlobally('top', combine.Largest(5)) -result_bot = pcoll | beam.CombineGlobally('bot', combine.Smallest(4)) +pcoll = pipeline | 'start' >> Create([6, 3, 1, 1, 9, 1, 5, 2, 0, 6]) +result_top = pcoll | 'top' >> beam.CombineGlobally(combine.Largest(5)) +result_bot = pcoll | 'bot' >> beam.CombineGlobally(combine.Smallest(4)) assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top') assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot') pcoll = pipeline | Create( 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) -result_ktop = pcoll | beam.CombinePerKey('top-perkey', combine.Largest(5)) +result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5)) result_kbot = pcoll | beam.CombinePerKey( 'bot-perkey', combine.Smallest(4)) assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top') @@ -119,7 +119,7 @@ class CombineTest(unittest.TestCase): # First test global samples (lots of them). for ix in xrange(300): pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | Create('start', [1, 1, 2, 2]) + pcoll = pipeline | 'start' >> Create([1, 1, 2, 2]) result = pcoll | combine.Sample.FixedSizeGlobally('sample-%d' % ix, 3) def matcher(): @@ -141,7 +141,7 @@ class CombineTest(unittest.TestCase): pcoll = pipeline | Create( 'start-perkey', sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) -result = pcoll | combine.Sample.FixedSizePerKey('sample', 3) +result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3) def matcher(): def match(actual): @@ -158,7 +158,7 @@ class CombineTest(unittest.TestCase): p = Pipeline('DirectPipelineRunner') result = ( p -| Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
[05/12] incubator-beam git commit: fixup: failing tests expecting name
fixup: failing tests expecting name Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5b5b14d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5b5b14d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5b5b14d Branch: refs/heads/python-sdk Commit: c5b5b14d35fc7f6b0a576f0fca19b730015e1282 Parents: b15d35c Author: Robert BradshawAuthored: Sat Jul 23 01:07:44 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:46 2016 -0700 -- sdks/python/apache_beam/transforms/ptransform_test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5b5b14d/sdks/python/apache_beam/transforms/ptransform_test.py -- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 992f944..b99cd26 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -931,7 +931,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p | beam.Create([1, 2, 3]).with_output_types(int) - | beam.GroupByKeyOnly()) + | 'F' >> beam.GroupByKeyOnly()) self.assertEqual("Input type hint violation at F: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -945,7 +945,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): (self.p | (beam.Create(range(5)) .with_output_types(typehints.Iterable[int])) - | beam.GroupByKey()) + | 'T' >> beam.GroupByKey()) self.assertEqual("Input type hint violation at T: " "expected Tuple[TypeVariable[K], TypeVariable[V]], " @@ -1563,7 +1563,7 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): with self.assertRaises(typehints.TypeCheckError) as e: (self.p - | beam.Create([1, 1, 2, 3]) + | 'f' >> beam.Create([1, 1, 2, 3]) | 'count elems' >> combine.Count.PerElement()) self.assertEqual('Pipeline type checking is enabled, however no output '
[12/12] incubator-beam git commit: Closes #718
Closes #718 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38d9dea2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38d9dea2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38d9dea2 Branch: refs/heads/python-sdk Commit: 38d9dea2e62af280e4b9c258cedee70d6bcaa8ca Parents: 9fe102a e3c078f Author: Robert BradshawAuthored: Sat Jul 23 16:43:47 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:47 2016 -0700 -- sdks/python/apache_beam/dataflow_test.py| 99 ++-- .../examples/complete/autocomplete.py | 16 +- .../examples/complete/autocomplete_test.py | 4 +- .../examples/complete/estimate_pi.py| 14 +- .../examples/complete/estimate_pi_test.py | 2 +- .../complete/juliaset/juliaset/juliaset.py | 11 +- .../apache_beam/examples/complete/tfidf.py | 32 +- .../apache_beam/examples/complete/tfidf_test.py | 2 +- .../examples/complete/top_wikipedia_sessions.py | 8 +- .../complete/top_wikipedia_sessions_test.py | 2 +- .../examples/cookbook/bigquery_schema.py| 4 +- .../examples/cookbook/bigquery_side_input.py| 18 +- .../cookbook/bigquery_side_input_test.py| 12 +- .../examples/cookbook/bigquery_tornadoes.py | 10 +- .../cookbook/bigquery_tornadoes_test.py | 2 +- .../apache_beam/examples/cookbook/bigshuffle.py | 31 +- .../apache_beam/examples/cookbook/coders.py | 2 +- .../examples/cookbook/coders_test.py| 4 +- .../examples/cookbook/custom_ptransform.py | 9 +- .../examples/cookbook/custom_ptransform_test.py | 2 +- .../apache_beam/examples/cookbook/filters.py| 14 +- .../examples/cookbook/filters_test.py | 2 +- .../examples/cookbook/group_with_coder.py | 9 +- .../examples/cookbook/mergecontacts.py | 8 +- .../examples/cookbook/multiple_output_pardo.py | 30 +- .../apache_beam/examples/snippets/snippets.py | 69 ++- .../examples/snippets/snippets_test.py | 16 +- .../apache_beam/examples/streaming_wordcap.py | 6 +- .../apache_beam/examples/streaming_wordcount.py | 8 +- sdks/python/apache_beam/examples/wordcount.py | 16 +- .../apache_beam/examples/wordcount_debugging.py | 18 +- .../apache_beam/examples/wordcount_minimal.py | 16 +- sdks/python/apache_beam/io/avroio.py| 2 +- sdks/python/apache_beam/io/bigquery.py | 4 +- .../apache_beam/io/filebasedsource_test.py | 4 +- sdks/python/apache_beam/io/iobase.py| 4 +- sdks/python/apache_beam/pipeline.py | 13 + sdks/python/apache_beam/pipeline_test.py| 48 +- sdks/python/apache_beam/pvalue_test.py | 6 +- .../consumer_tracking_pipeline_visitor_test.py | 4 +- sdks/python/apache_beam/runners/runner_test.py | 6 +- .../apache_beam/transforms/combiners_test.py| 64 ++- sdks/python/apache_beam/transforms/core.py | 21 +- .../python/apache_beam/transforms/ptransform.py | 9 +- .../apache_beam/transforms/ptransform_test.py | 521 ++- sdks/python/apache_beam/transforms/util.py | 9 +- .../apache_beam/transforms/window_test.py | 14 +- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py| 16 +- 49 files changed, 628 insertions(+), 615 deletions(-) --
[07/12] incubator-beam git commit: Cleanup and fix combiners_test.
Cleanup and fix combiners_test. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01830510 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01830510 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01830510 Branch: refs/heads/python-sdk Commit: 01830510bded3c22ddd96937f5d83547702a4385 Parents: 7c186ce Author: Robert BradshawAuthored: Fri Jul 22 16:05:23 2016 -0700 Committer: Robert Bradshaw Committed: Sat Jul 23 16:43:46 2016 -0700 -- .../apache_beam/transforms/combiners_test.py| 20 +--- 1 file changed, 9 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01830510/sdks/python/apache_beam/transforms/combiners_test.py -- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 0439fe1..c970382 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -79,12 +79,11 @@ class CombineTest(unittest.TestCase): assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp') # Again for per-key combines. -pcoll = pipeline | Create( -'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) +pcoll = pipeline | 'start-perkye' >> Create( +[('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5) result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4) -result_key_cmp = pcoll | combine.Top.PerKey( -'cmp-perkey', +result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey( 6, lambda a, b, names: len(names[a]) < len(names[b]), names) # Note parameter passed to comparator. @@ -105,11 +104,11 @@ class CombineTest(unittest.TestCase): assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top') assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot') -pcoll = pipeline | Create( -'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) +pcoll = pipeline | 'start-perkey' >> Create( +[('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5)) -result_kbot = pcoll | beam.CombinePerKey( -'bot-perkey', combine.Smallest(4)) +result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey( +combine.Smallest(4)) assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top') assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot') pipeline.run() @@ -138,8 +137,7 @@ class CombineTest(unittest.TestCase): # Now test per-key samples. pipeline = Pipeline('DirectPipelineRunner') -pcoll = pipeline | Create( -'start-perkey', +pcoll = pipeline | 'start-perkey' >> Create( sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3) @@ -158,7 +156,7 @@ class CombineTest(unittest.TestCase): p = Pipeline('DirectPipelineRunner') result = ( p -| 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)]) +| Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) | beam.CombineGlobally(combine.TupleCombineFn(max, combine.MeanCombineFn(), sum)).without_defaults())
[GitHub] incubator-beam pull request #718: Change all examples and tests to use the n...
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/718 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #705: Minor cdef value changes.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/705 Minor cdef value changes. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam cdef-structs-edit Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/705.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #705 commit 4ac5f8643585e9b57b8e1eaf309810efd1c0163e Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-21T17:36:36Z Minor cdef value changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #721: Make DoFnRunner a Receiver.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/721 Make DoFnRunner a Receiver. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam dofn-receiver Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/721.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #721 commit 943adfd6327321f09d60ae4302aeb470aa57e558 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-23T00:33:57Z Make DoFnRunner a Receiver. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #705: Minor cdef value changes.
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/705 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #702: Add tests for WindowedValue.
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/702 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Add tests for WindowedValue.
Add tests for WindowedValue. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/441fad31 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/441fad31 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/441fad31 Branch: refs/heads/python-sdk Commit: 441fad31ba42e14cd305277060b7e26a97f68166 Parents: e500699 Author: Robert BradshawAuthored: Wed Jul 20 14:10:11 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 21 17:44:02 2016 -0700 -- .../apache_beam/utils/windowed_value_test.py| 71 1 file changed, 71 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/441fad31/sdks/python/apache_beam/utils/windowed_value_test.py -- diff --git a/sdks/python/apache_beam/utils/windowed_value_test.py b/sdks/python/apache_beam/utils/windowed_value_test.py new file mode 100644 index 000..f257410 --- /dev/null +++ b/sdks/python/apache_beam/utils/windowed_value_test.py @@ -0,0 +1,71 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for the windowed_value.""" + +import copy +import pickle +import unittest + +from apache_beam.utils import windowed_value +from apache_beam.transforms.timeutil import Timestamp + + +class WindowedValueTest(unittest.TestCase): + + def test_timestamps(self): +wv = windowed_value.WindowedValue(None, 3, ()) +self.assertEqual(wv.timestamp, Timestamp.of(3)) +self.assertTrue(wv.timestamp is wv.timestamp) +self.assertEqual(windowed_value.WindowedValue(None, -2.5, ()).timestamp, + Timestamp.of(-2.5)) + + def test_with_value(self): +wv = windowed_value.WindowedValue(1, 3, ()) +self.assertEqual(wv.with_value(10), windowed_value.WindowedValue(10, 3, ())) + + def test_equality(self): +self.assertEqual( +windowed_value.WindowedValue(1, 3, ()), +windowed_value.WindowedValue(1, 3, ())) +self.assertNotEqual( +windowed_value.WindowedValue(1, 3, ()), +windowed_value.WindowedValue(100, 3, ())) +self.assertNotEqual( +windowed_value.WindowedValue(1, 3, ()), +windowed_value.WindowedValue(1, 300, ())) +self.assertNotEqual( +windowed_value.WindowedValue(1, 3, ()), +windowed_value.WindowedValue(1, 300, ((),))) + +self.assertNotEqual( +windowed_value.WindowedValue(1, 3, ()), +object()) + + def test_hash(self): +wv = windowed_value.WindowedValue(1, 3, ()) +wv_copy = copy.copy(wv) +self.assertFalse(wv is wv_copy) +self.assertEqual({wv: 100}.get(wv_copy), 100) + + def test_pickle(self): +wv = windowed_value.WindowedValue(1, 3, ()) +self.assertTrue(pickle.loads(pickle.dumps(wv)) == wv) + + +if __name__ == '__main__': + unittest.main()
[1/2] incubator-beam git commit: Closes #702
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e5006991d -> 09b4daf23 Closes #702 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/09b4daf2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/09b4daf2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/09b4daf2 Branch: refs/heads/python-sdk Commit: 09b4daf23fc80ad565dbdcd8704c1b4c01d879fc Parents: e500699 441fad3 Author: Robert BradshawAuthored: Thu Jul 21 17:44:02 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 21 17:44:02 2016 -0700 -- .../apache_beam/utils/windowed_value_test.py| 71 1 file changed, 71 insertions(+) --
[1/4] incubator-beam git commit: Remove expensive per-element-step logging context.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 37f426faf -> d78b38d1d Remove expensive per-element-step logging context. This is 3-4% of the total runtime. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f99aa77f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f99aa77f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f99aa77f Branch: refs/heads/python-sdk Commit: f99aa77f9acf5dbfdf762913a1f997d2c3658d3a Parents: 37f426f Author: Robert BradshawAuthored: Thu Jul 21 12:08:33 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 21 17:35:49 2016 -0700 -- sdks/python/apache_beam/runners/common.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f99aa77f/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index ef28c63..134fb06 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -95,9 +95,8 @@ class DoFnRunner(object): def process(self, element): try: - with self.logger.PerThreadLoggingContext(step_name=self.step_name): -self.context.set_element(element) -self._process_outputs(element, self.dofn.process(self.context)) + self.context.set_element(element) + self._process_outputs(element, self.dofn.process(self.context)) except BaseException as exn: self.reraise_augmented(exn)
[4/4] incubator-beam git commit: Restore (faster) logging context.
Restore (faster) logging context. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ecf9e3a3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ecf9e3a3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ecf9e3a3 Branch: refs/heads/python-sdk Commit: ecf9e3a3cc3dcbb3413403d4558c95d4a0097350 Parents: 7c9d77a Author: Robert Bradshaw <rober...@google.com> Authored: Thu Jul 21 15:24:01 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Jul 21 17:36:04 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 8 +++- sdks/python/apache_beam/runners/common.py | 20 ++-- 2 files changed, 21 insertions(+), 7 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index f01a362..7191659 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -27,7 +27,7 @@ cdef class DoFnRunner(object): cdef object window_fn cdef object context # TODO(robertwb): Make this a DoFnContext cdef object tagged_receivers - cdef object logger + cdef object logging_context # TODO(robertwb): Make this a LoggingContext cdef object step_name cdef object main_receivers # TODO(robertwb): Make this a Receiver @@ -44,3 +44,9 @@ cdef class DoFnContext(object): cdef class Receiver(object): cdef receive(self, WindowedValue windowed_value) + + +cdef class LoggingContext(object): + # TODO(robertwb): Optimize "with [cdef class]" + cpdef enter(self) + cpdef exit(self) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/ecf9e3a3/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 80db823..a565645 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -29,14 +29,12 @@ from apache_beam.transforms.window import WindowFn from apache_beam.utils.windowed_value import WindowedValue -class FakeLogger(object): - def PerThreadLoggingContext(self, *unused_args, **unused_kwargs): -return self +class LoggingContext(object): - def __enter__(self): + def enter(self): pass - def __exit__(self, *unused_args): + def exit(self): pass @@ -76,7 +74,8 @@ class DoFnRunner(object): self.window_fn = windowing.windowfn self.context = context self.tagged_receivers = tagged_receivers -self.logger = logger or FakeLogger() +self.logging_context = (logger.PerThreadLoggingContext(step_name=step_name) +if logger else LoggingContext()) self.step_name = step_name # Optimize for the common case. @@ -85,23 +84,32 @@ class DoFnRunner(object): def start(self): self.context.set_element(None) try: + self.logging_context.enter() self._process_outputs(None, self.dofn.start_bundle(self.context)) except BaseException as exn: self.reraise_augmented(exn) +finally: + self.logging_context.exit() def finish(self): self.context.set_element(None) try: + self.logging_context.enter() self._process_outputs(None, self.dofn.finish_bundle(self.context)) except BaseException as exn: self.reraise_augmented(exn) +finally: + self.logging_context.exit() def process(self, element): try: + self.logging_context.enter() self.context.set_element(element) self._process_outputs(element, self.dofn_process(self.context)) except BaseException as exn: self.reraise_augmented(exn) +finally: + self.logging_context.exit() def reraise_augmented(self, exn): if getattr(exn, '_tagged_with_step', False) or not self.step_name:
[3/4] incubator-beam git commit: Closes #706
Closes #706 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/d78b38d1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/d78b38d1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/d78b38d1 Branch: refs/heads/python-sdk Commit: d78b38d1dab75f65aa6dfc3bca53d58bafc1d37a Parents: 37f426f ecf9e3a Author: Robert BradshawAuthored: Thu Jul 21 17:36:04 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 21 17:36:04 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 9 +++- sdks/python/apache_beam/runners/common.py | 28 + 2 files changed, 27 insertions(+), 10 deletions(-) --
[2/4] incubator-beam git commit: Cache dofn.proces method.
Cache dofn.proces method. Saves another couple percent. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7c9d77ac Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7c9d77ac Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7c9d77ac Branch: refs/heads/python-sdk Commit: 7c9d77ac2509c7625c5709f885f8b5aadb4f9f74 Parents: f99aa77 Author: Robert Bradshaw <rober...@google.com> Authored: Thu Jul 21 12:11:03 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Thu Jul 21 17:36:04 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 1 + sdks/python/apache_beam/runners/common.py | 5 - 2 files changed, 5 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index e855376..f01a362 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -23,6 +23,7 @@ cdef type SideOutputValue, TimestampedValue cdef class DoFnRunner(object): cdef object dofn + cdef object dofn_process cdef object window_fn cdef object context # TODO(robertwb): Make this a DoFnContext cdef object tagged_receivers http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7c9d77ac/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 134fb06..80db823 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -56,6 +56,7 @@ class DoFnRunner(object): step_name=None): if not args and not kwargs: self.dofn = fn + self.dofn_process = fn.process else: args, kwargs = util.insert_values_in_args(args, kwargs, side_inputs) @@ -70,6 +71,8 @@ class DoFnRunner(object): def finish_bundle(self, context): return fn.finish_bundle(context) self.dofn = CurriedFn() + self.dofn_process = lambda context: fn.process(context, *args, **kwargs) + self.window_fn = windowing.windowfn self.context = context self.tagged_receivers = tagged_receivers @@ -96,7 +99,7 @@ class DoFnRunner(object): def process(self, element): try: self.context.set_element(element) - self._process_outputs(element, self.dofn.process(self.context)) + self._process_outputs(element, self.dofn_process(self.context)) except BaseException as exn: self.reraise_augmented(exn)
[2/2] incubator-beam git commit: Closes #705
Closes #705 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e5006991 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e5006991 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e5006991 Branch: refs/heads/python-sdk Commit: e5006991d730f8fb222639f7c23b33cb8f046ce4 Parents: d78b38d 19082ad Author: Robert BradshawAuthored: Thu Jul 21 17:41:06 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 21 17:41:06 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 4 ++-- sdks/python/apache_beam/runners/common.py | 15 --- 2 files changed, 14 insertions(+), 5 deletions(-) --
[1/2] incubator-beam git commit: Minor cdef value changes.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d78b38d1d -> e5006991d Minor cdef value changes. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/19082adb Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/19082adb Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/19082adb Branch: refs/heads/python-sdk Commit: 19082adb1522365b1f526e8c28a0a04a20744128 Parents: d78b38d Author: Robert BradshawAuthored: Thu Jul 21 10:36:36 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 21 17:38:55 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 4 ++-- sdks/python/apache_beam/runners/common.py | 15 --- 2 files changed, 14 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19082adb/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 7191659..e1bf461 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -39,11 +39,11 @@ cdef class DoFnContext(object): cdef object label cdef object state cdef WindowedValue windowed_value - cdef set_element(self, WindowedValue windowed_value) + cpdef set_element(self, WindowedValue windowed_value) cdef class Receiver(object): - cdef receive(self, WindowedValue windowed_value) + cpdef receive(self, WindowedValue windowed_value) cdef class LoggingContext(object): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/19082adb/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index a565645..c6b27bc 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -214,15 +214,24 @@ class DoFnContext(object): @property def element(self): -return self.windowed_value.value +if self.windowed_value is None: + raise AttributeError('element not accessible in this context') +else: + return self.windowed_value.value @property def timestamp(self): -return self.windowed_value.timestamp +if self.windowed_value is None: + raise AttributeError('timestamp not accessible in this context') +else: + return self.windowed_value.timestamp @property def windows(self): -return self.windowed_value.windows +if self.windowed_value is None: + raise AttributeError('windows not accessible in this context') +else: + return self.windowed_value.windows def aggregate_to(self, aggregator, input_value): self.state.counter_for(aggregator).update(input_value)
[GitHub] incubator-beam pull request #708: Revert "Restore (faster) logging context."
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/708 Revert "Restore (faster) logging context." This fixes the jenkins build that relies on not-yet-pushed dataflow changes. --- This reverts commit ecf9e3a3cc3dcbb3413403d4558c95d4a0097350. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam no-no-logging Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/708.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #708 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 351c3831d -> c155ef0eb Increased the GCS buffer size from 1MB to 8MB and introduced a 128kB buffer for the pipe. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a1f1fa06 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a1f1fa06 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a1f1fa06 Branch: refs/heads/python-sdk Commit: a1f1fa06ee8683273182548e7eb2d6612040d2bf Parents: 351c383 Author: Marian DvorskyAuthored: Thu Jul 28 13:02:15 2016 -0700 Committer: Marian Dvorsky Committed: Thu Jul 28 13:02:15 2016 -0700 -- sdks/python/apache_beam/io/gcsio.py | 30 +- 1 file changed, 21 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a1f1fa06/sdks/python/apache_beam/io/gcsio.py -- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 9377266..88fcfb8 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -49,6 +49,7 @@ except ImportError: DEFAULT_READ_BUFFER_SIZE = 1024 * 1024 +WRITE_CHUNK_SIZE = 8 * 1024 * 1024 def parse_gcs_path(gcs_path): @@ -546,6 +547,10 @@ class GcsBufferedWriter(object): self.closed = False self.position = 0 +# A small buffer to avoid CPU-heavy per-write pipe calls. +self.write_buffer = bytearray() +self.write_buffer_size = 128 * 1024 + # Set up communication with uploading thread. parent_conn, child_conn = multiprocessing.Pipe() self.child_conn = child_conn @@ -557,7 +562,7 @@ class GcsBufferedWriter(object): bucket=self.bucket, name=self.name)) self.upload = transfer.Upload(GcsBufferedWriter.PipeStream(child_conn), - mime_type) + mime_type, chunksize=WRITE_CHUNK_SIZE) self.upload.strategy = transfer.RESUMABLE_UPLOAD # Start uploading thread. @@ -598,14 +603,10 @@ class GcsBufferedWriter(object): self._check_open() if not data: return -try: - self.conn.send_bytes(data) - self.position += len(data) -except IOError: - if self.upload_thread.last_error: -raise self.upload_thread.last_error # pylint: disable=raising-bad-type - else: -raise +self.write_buffer.extend(data) +if len(self.write_buffer) > self.write_buffer_size: + self._flush_write_buffer() +self.position += len(data) def tell(self): """Return the total number of bytes passed to write() so far.""" @@ -613,6 +614,7 @@ class GcsBufferedWriter(object): def close(self): """Close the current GCS file.""" +self._flush_write_buffer() self.closed = True self.conn.close() self.upload_thread.join() @@ -635,3 +637,13 @@ class GcsBufferedWriter(object): def writable(self): return True + + def _flush_write_buffer(self): +try: + self.conn.send_bytes(buffer(self.write_buffer)) + self.write_buffer = bytearray() +except IOError: + if self.upload_thread.last_error: +raise self.upload_thread.last_error # pylint: disable=raising-bad-type + else: +raise
[2/2] incubator-beam git commit: Closes #752
Closes #752 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c155ef0e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c155ef0e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c155ef0e Branch: refs/heads/python-sdk Commit: c155ef0ebc88ac34a3681b8bff6152e1857da847 Parents: 351c383 a1f1fa0 Author: Robert BradshawAuthored: Thu Jul 28 18:17:43 2016 -0700 Committer: Robert Bradshaw Committed: Thu Jul 28 18:17:43 2016 -0700 -- sdks/python/apache_beam/io/gcsio.py | 30 +- 1 file changed, 21 insertions(+), 9 deletions(-) --
[GitHub] incubator-beam pull request #748: Optimize Map and Flatmap when there are no...
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/748 Optimize Map and Flatmap when there are no side inputs. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- varargs and kwargs are expensive, even when they're empty. This is especially true for otherwise one-argument Python calls which are special cased in CPython. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam fast-noarg Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/748.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #748 commit 257d3c449b68fbf842838c95bb2744bb517d02a9 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-28T01:29:59Z Optimize Map and Flatmap when there are no side inputs. varargs and kwargs are expensive, even when they're empty. This is especially true for otherwise one-argument Python calls which are special cased in CPython. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #770: Implement add_input for all CombineFns.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/770 Implement add_input for all CombineFns. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam add-input Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/770.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #770 commit 3ebf28c6e0d17af3720076e33f88a0f126a89059 Author: Robert Bradshaw <rober...@gmail.com> Date: 2016-07-26T08:15:55Z Implement add_input for all CombineFns. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/3] incubator-beam git commit: Implement add_input for all CombineFns.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk e834fa82b -> 65152cab8 Implement add_input for all CombineFns. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3ebf28c6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3ebf28c6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3ebf28c6 Branch: refs/heads/python-sdk Commit: 3ebf28c6e0d17af3720076e33f88a0f126a89059 Parents: e834fa8 Author: Robert BradshawAuthored: Tue Jul 26 01:15:55 2016 -0700 Committer: Robert Bradshaw Committed: Tue Aug 2 15:52:28 2016 -0700 -- sdks/python/apache_beam/transforms/combiners.py | 16 sdks/python/apache_beam/transforms/core.py | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/combiners.py -- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index 155dcc6..c3f0da1 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -132,6 +132,9 @@ class CountCombineFn(core.CombineFn): def create_accumulator(self): return 0 + def add_input(self, accumulator, element): +return accumulator + 1 + def add_inputs(self, accumulator, elements): return accumulator + len(elements) @@ -425,9 +428,9 @@ class _TupleCombineFnBase(core.CombineFn): class TupleCombineFn(_TupleCombineFnBase): - def add_inputs(self, accumulator, elements): -return [c.add_inputs(a, e) -for c, a, e in zip(self._combiners, accumulator, zip(*elements))] + def add_input(self, accumulator, element): +return [c.add_input(a, e) +for c, a, e in zip(self._combiners, accumulator, element)] def with_common_input(self): return SingleInputTupleCombineFn(*self._combiners) @@ -435,8 +438,8 @@ class TupleCombineFn(_TupleCombineFnBase): class SingleInputTupleCombineFn(_TupleCombineFnBase): - def add_inputs(self, accumulator, elements): -return [c.add_inputs(a, elements) + def add_input(self, accumulator, element): +return [c.add_input(a, element) for c, a in zip(self._combiners, accumulator)] @@ -522,9 +525,6 @@ def curry_combine_fn(fn, args, kwargs): def add_input(self, accumulator, element): return fn.add_input(accumulator, element, *args, **kwargs) - def add_inputs(self, accumulator, elements): -return fn.add_inputs(accumulator, elements, *args, **kwargs) - def merge_accumulators(self, accumulators): return fn.merge_accumulators(accumulators, *args, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3ebf28c6/sdks/python/apache_beam/transforms/core.py -- diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index 38b9cd2..da26205 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -270,7 +270,7 @@ class CombineFn(WithTypeHints): 1. Input values are partitioned into one or more batches. 2. For each batch, the create_accumulator method is invoked to create a fresh initial "accumulator" value representing the combination of zero values. - 3. For each input value in the batch, the add_inputs method is invoked to + 3. For each input value in the batch, the add_input method is invoked to combine more values with the accumulator for that batch. 4. The merge_accumulators method is invoked to combine accumulators from separate batches into a single combined output accumulator value, once all @@ -296,7 +296,7 @@ class CombineFn(WithTypeHints): def add_input(self, accumulator, element, *args, **kwargs): """Return result of folding element into accumulator. -CombineFn implementors must override either add_input or add_inputs. +CombineFn implementors must override add_input. Args: accumulator: the current accumulator @@ -420,7 +420,7 @@ class CallableWrapperCombineFn(CombineFn): if accumulator is self._EMPTY: return self._fn(elements, *args, **kwargs) elif isinstance(elements, (list, tuple)): - return self._fn([accumulator] + elements, *args, **kwargs) + return self._fn([accumulator] + list(elements), *args, **kwargs) else: def union(): yield accumulator
[2/3] incubator-beam git commit: Document TupleCombineFns
Document TupleCombineFns Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a2239d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a2239d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a2239d3 Branch: refs/heads/python-sdk Commit: 4a2239d3701e13622998c71107d263c8966e73e1 Parents: 3ebf28c Author: Robert BradshawAuthored: Wed Aug 3 13:52:36 2016 -0700 Committer: Robert Bradshaw Committed: Wed Aug 3 13:52:36 2016 -0700 -- sdks/python/apache_beam/transforms/combiners.py | 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a2239d3/sdks/python/apache_beam/transforms/combiners.py -- diff --git a/sdks/python/apache_beam/transforms/combiners.py b/sdks/python/apache_beam/transforms/combiners.py index c3f0da1..a0604b8 100644 --- a/sdks/python/apache_beam/transforms/combiners.py +++ b/sdks/python/apache_beam/transforms/combiners.py @@ -427,6 +427,12 @@ class _TupleCombineFnBase(core.CombineFn): class TupleCombineFn(_TupleCombineFnBase): + """A combiner for combining tuples via a tuple of combiners. + + Takes as input a tuple of N CombineFns and combines N-tuples by + combining the k-th element of each tuple with the k-th CombineFn, + outputting a new N-tuple of combined values. + """ def add_input(self, accumulator, element): return [c.add_input(a, e) @@ -437,6 +443,12 @@ class TupleCombineFn(_TupleCombineFnBase): class SingleInputTupleCombineFn(_TupleCombineFnBase): + """A combiner for combining a single value via a tuple of combiners. + + Takes as input a tuple of N CombineFns and combines elements by + applying each CombineFn to each input, producing an N-tuple of + the outputs corresponding to each of the N CombineFn's outputs. + """ def add_input(self, accumulator, element): return [c.add_input(a, element)
[GitHub] incubator-beam pull request #702: Add tests for WindowedValue.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/702 Add tests for WindowedValue. I just realized I forgot to add this file when doing #691 . You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam wv-tests Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/702.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #702 commit 59bea7c04a5d4ae2ee69ab0ef11401a0063583a4 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-20T21:10:11Z Add tests for WindowedValue. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #694
Closes #694 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/22126285 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/22126285 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/22126285 Branch: refs/heads/python-sdk Commit: 2212628585d6bfc331954b40f8c2823878ab3fce Parents: 71214b3 7baa662 Author: Robert BradshawAuthored: Wed Jul 20 11:54:20 2016 -0700 Committer: Robert Bradshaw Committed: Wed Jul 20 11:54:20 2016 -0700 -- sdks/python/apache_beam/utils/dependency.py | 6 +- 1 file changed, 1 insertion(+), 5 deletions(-) --
[4/5] incubator-beam git commit: Clarifying comments.
Clarifying comments. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9d1fc85d Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9d1fc85d Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9d1fc85d Branch: refs/heads/python-sdk Commit: 9d1fc85d5352f16ce8f0f092865decc6b296dbb8 Parents: 2768981 Author: Robert Bradshaw <rober...@google.com> Authored: Wed Jul 20 12:24:18 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Jul 20 13:06:21 2016 -0700 -- sdks/python/apache_beam/runners/common.py | 1 + sdks/python/apache_beam/utils/windowed_value.py | 12 ++-- 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d1fc85d/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 059359c..ef28c63 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -190,6 +190,7 @@ class DoFnState(object): self.step_name, aggregator) +# TODO(robertwb): Replace core.DoFnContext with this. class DoFnContext(object): def __init__(self, label, element=None, state=None): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9d1fc85d/sdks/python/apache_beam/utils/windowed_value.py -- diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py index 5172b36..4c50e72 100644 --- a/sdks/python/apache_beam/utils/windowed_value.py +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -72,11 +72,16 @@ class WindowedValue(object): # We'd rather implement __eq__, but Cython supports that via __richcmp__ # instead. Fortunately __cmp__ is understood by both (but not by Python 3). def __cmp__(left, right): # pylint: disable=no-self-argument +"""Compares left and right for equality. + +For performance reasons, doesn't actually impose an ordering +on unequal values (always returning 1). +""" if type(left) is not type(right): return cmp(type(left), type(right)) else: - # Don't bother paying the cost of a total ordering. # TODO(robertwb): Avoid the type checks? + # Returns False (0) if equal, and True (1) if not. return not WindowedValue._typed_eq(left, right) @staticmethod @@ -111,4 +116,7 @@ def create(value, timestamp_micros, windows): try: WindowedValue.timestamp_object = None except TypeError: - pass # Cythonized class already has this default value. + # When we're compiled, we can't dynamically add attributes to + # the cdef class, but in this case it's OK as it's already present + # on each instance. + pass
[3/5] incubator-beam git commit: Cythonize WindowedValue class.
t, WindowedValue right) except? -2 + +@cython.locals(wv=WindowedValue) +cdef inline WindowedValue create( + object value, int64_t timestamp_micros, object windows) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/dc42467f/sdks/python/apache_beam/utils/windowed_value.py -- diff --git a/sdks/python/apache_beam/utils/windowed_value.py b/sdks/python/apache_beam/utils/windowed_value.py new file mode 100644 index 000..5172b36 --- /dev/null +++ b/sdks/python/apache_beam/utils/windowed_value.py @@ -0,0 +1,114 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Core windowing data structures. +""" + +# This module is carefully crafted to have optimal performance when +# compiled whiel still being valid Python. Care needs to be taken when +# editing this file as WindowedValues are created for every element for +# every step in a Beam pipeline. + +#cython: profile=True + +from apache_beam.transforms.timeutil import MAX_TIMESTAMP +from apache_beam.transforms.timeutil import MIN_TIMESTAMP +from apache_beam.transforms.timeutil import Timestamp + + +class WindowedValue(object): + """A windowed value having a value, a timestamp and set of windows. + + Attributes: +value: The underlying value of a windowed value. +timestamp: Timestamp associated with the value as seconds since Unix epoch. +windows: A set (iterable) of window objects for the value. The window + object are descendants of the BoundedWindow class. + """ + + def __init__(self, value, timestamp, windows): +# For performance reasons, only timestamp_micros is stored by default +# (as a C int). The Timestamp object is created on demand below. +self.value = value +if isinstance(timestamp, int): + self.timestamp_micros = timestamp * 100 +else: + self.timestamp_object = (timestamp if isinstance(timestamp, Timestamp) + else Timestamp.of(timestamp)) + self.timestamp_micros = self.timestamp_object.micros +self.windows = windows + + @property + def timestamp(self): +if self.timestamp_object is None: + self.timestamp_object = Timestamp(0, self.timestamp_micros) +return self.timestamp_object + + def __repr__(self): +return '(%s, %s, %s)' % ( +repr(self.value), +'MIN_TIMESTAMP' if self.timestamp == MIN_TIMESTAMP else +'MAX_TIMESTAMP' if self.timestamp == MAX_TIMESTAMP else +float(self.timestamp), +self.windows) + + def __hash__(self): +return hash(self.value) + 3 * self.timestamp_micros + 7 * hash(self.windows) + + # We'd rather implement __eq__, but Cython supports that via __richcmp__ + # instead. Fortunately __cmp__ is understood by both (but not by Python 3). + def __cmp__(left, right): # pylint: disable=no-self-argument +if type(left) is not type(right): + return cmp(type(left), type(right)) +else: + # Don't bother paying the cost of a total ordering. + # TODO(robertwb): Avoid the type checks? + return not WindowedValue._typed_eq(left, right) + + @staticmethod + def _typed_eq(left, right): +if (left.timestamp_micros == right.timestamp_micros +and left.value == right.value +and left.windows == right.windows): + return True +else: + return False + + def with_value(self, new_value): +"""Creates a new WindowedValue with the same timestamps and windows as this. + +This is the fasted way to create a new WindowedValue. +""" +return create(new_value, self.timestamp_micros, self.windows) + + def __reduce__(self): +return WindowedValue, (self.value, self.timestamp, self.windows) + + +# TODO(robertwb): Move this to a static method. +def create(value, timestamp_micros, windows): + wv = WindowedValue.__new__(WindowedValue) + wv.value = value + wv.timestamp_micros = timestamp_micros + wv.windows = windows + return wv + + +try: + WindowedValue.timestamp_object = None +except TypeError: + pass # Cythonized class already has this default value. http://git-wip-us.apache.org/repos/asf/incubator-bea
[5/5] incubator-beam git commit: Add Cython DoFnContext and Receiver stubs.
Add Cython DoFnContext and Receiver stubs. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/8efc231c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/8efc231c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/8efc231c Branch: refs/heads/python-sdk Commit: 8efc231c867fcf5267ba21a8a33ee306b87d9945 Parents: dc42467 Author: Robert Bradshaw <rober...@google.com> Authored: Tue Jul 19 12:22:08 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Wed Jul 20 13:06:21 2016 -0700 -- sdks/python/apache_beam/runners/common.pxd | 20 --- sdks/python/apache_beam/runners/common.py | 34 - 2 files changed, 50 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.pxd -- diff --git a/sdks/python/apache_beam/runners/common.pxd b/sdks/python/apache_beam/runners/common.pxd index 480c056..e855376 100644 --- a/sdks/python/apache_beam/runners/common.pxd +++ b/sdks/python/apache_beam/runners/common.pxd @@ -15,17 +15,31 @@ # limitations under the License. # -cdef type SideOutputValue, TimestampedValue, WindowedValue +from apache_beam.utils.windowed_value cimport WindowedValue + +cdef type SideOutputValue, TimestampedValue + cdef class DoFnRunner(object): cdef object dofn cdef object window_fn - cdef object context + cdef object context # TODO(robertwb): Make this a DoFnContext cdef object tagged_receivers cdef object logger cdef object step_name - cdef object main_receivers + cdef object main_receivers # TODO(robertwb): Make this a Receiver cpdef _process_outputs(self, element, results) + + +cdef class DoFnContext(object): + cdef object label + cdef object state + cdef WindowedValue windowed_value + cdef set_element(self, WindowedValue windowed_value) + + +cdef class Receiver(object): + cdef receive(self, WindowedValue windowed_value) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8efc231c/sdks/python/apache_beam/runners/common.py -- diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 3c0c3f6..059359c 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -25,8 +25,8 @@ from apache_beam.internal import util from apache_beam.pvalue import SideOutputValue from apache_beam.transforms import core from apache_beam.transforms.window import TimestampedValue -from apache_beam.transforms.window import WindowedValue from apache_beam.transforms.window import WindowFn +from apache_beam.utils.windowed_value import WindowedValue class FakeLogger(object): @@ -188,3 +188,35 @@ class DoFnState(object): """Looks up the counter for this aggregator, creating one if necessary.""" return self._counter_factory.get_aggregator_counter( self.step_name, aggregator) + + +class DoFnContext(object): + + def __init__(self, label, element=None, state=None): +self.label = label +self.state = state +if element is not None: + self.set_element(element) + + def set_element(self, windowed_value): +self.windowed_value = windowed_value + + @property + def element(self): +return self.windowed_value.value + + @property + def timestamp(self): +return self.windowed_value.timestamp + + @property + def windows(self): +return self.windowed_value.windows + + def aggregate_to(self, aggregator, input_value): +self.state.counter_for(aggregator).update(input_value) + + +class Receiver(object): + def receive(self, windowed_value): +raise NotImplementedError
[2/5] incubator-beam git commit: Reduce the number of elements in the pvalue caching test.
Reduce the number of elements in the pvalue caching test. It seems this test is causing travis-ci to time out, as the non-compiled version got slightly slower. 100,000 elements should be sufficient to see the effects of not caching. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/27689819 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/27689819 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/27689819 Branch: refs/heads/python-sdk Commit: 276898195819329a1b88002a41a109df61530a26 Parents: 8efc231 Author: Robert BradshawAuthored: Wed Jul 20 01:37:49 2016 -0700 Committer: Robert Bradshaw Committed: Wed Jul 20 13:06:21 2016 -0700 -- sdks/python/apache_beam/pipeline_test.py | 18 ++ 1 file changed, 10 insertions(+), 8 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/27689819/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index c1db5cb..86ae45f 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -207,11 +207,13 @@ class PipelineTest(unittest.TestCase): yield o yield SideOutputValue('side', o) +num_elements = 10 + pipeline = Pipeline('DirectPipelineRunner') gc.collect() count_threshold = len(gc.get_objects()) + 1 -biglist = pipeline | Create('oom:create', ['x'] * 100) +biglist = pipeline | Create('oom:create', ['x'] * num_elements) dupes = ( biglist | Map('oom:addone', lambda x: (x, 1)) @@ -223,17 +225,17 @@ class PipelineTest(unittest.TestCase): | CombinePerKey('oom:combine', sum) | Map('oom:check', check_memory, count_threshold)) -assert_that(result, equal_to([('x', 300)])) +assert_that(result, equal_to([('x', 3 * num_elements)])) pipeline.run() self.assertEqual( pipeline.runner.debug_counters['element_counts'], { -'oom:flatten': 300, -('oom:combine/GroupByKey/reify_windows', None): 300, -('oom:dupes/oom:dupes', 'side'): 100, -('oom:dupes/oom:dupes', None): 100, -'oom:create': 100, -('oom:addone', None): 100, +'oom:flatten': 3 * num_elements, +('oom:combine/GroupByKey/reify_windows', None): 3 * num_elements, +('oom:dupes/oom:dupes', 'side'): num_elements, +('oom:dupes/oom:dupes', None): num_elements, +'oom:create': num_elements, +('oom:addone', None): num_elements, 'oom:combine/GroupByKey/group_by_key': 1, ('oom:check', None): 1, 'assert_that/singleton': 1,
[1/5] incubator-beam git commit: Closes #691
Repository: incubator-beam Updated Branches: refs/heads/python-sdk a643e2009 -> 37f426faf Closes #691 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37f426fa Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37f426fa Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37f426fa Branch: refs/heads/python-sdk Commit: 37f426faf5c47d5b99d100292fd0cde6a640939e Parents: a643e20 9d1fc85 Author: Robert BradshawAuthored: Wed Jul 20 13:06:21 2016 -0700 Committer: Robert Bradshaw Committed: Wed Jul 20 13:06:21 2016 -0700 -- sdks/python/apache_beam/pipeline_test.py| 18 +-- sdks/python/apache_beam/runners/common.pxd | 20 ++- sdks/python/apache_beam/runners/common.py | 35 +- sdks/python/apache_beam/transforms/window.py| 37 +- .../python/apache_beam/utils/windowed_value.pxd | 38 ++ sdks/python/apache_beam/utils/windowed_value.py | 122 +++ sdks/python/setup.py| 1 + 7 files changed, 223 insertions(+), 48 deletions(-) --
[GitHub] incubator-beam pull request #676: Remove pipeline.apply(pvalue, callable)
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/676 Remove pipeline.apply(pvalue, callable) Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam no-callable Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/676.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #676 commit dd0b437fdd6ffa13f758dfd8daed4ee6aa4fd758 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-18T17:23:24Z Remove pipeline.apply(pvalue, callable) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #679: Add >> operator for labeling PTransforms.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/679 Add >> operator for labeling PTransforms. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- Also propagates label passed into PValue.apply(...) method. You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam rshift Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #679 commit 6c89554ec014440ffd778e58bc1010ac68afbc91 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-18T19:46:54Z Add >> operator for labeling PTransforms. Also propagates label passed into PValue.apply(...) method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[2/2] incubator-beam git commit: Closes #683
Closes #683 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/062af66f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/062af66f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/062af66f Branch: refs/heads/python-sdk Commit: 062af66f83329485d06c6dc68afa29b6a39a91b1 Parents: 5610905 fdab1f9 Author: Robert BradshawAuthored: Mon Jul 18 23:41:14 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 23:41:14 2016 -0700 -- sdks/python/apache_beam/coders/coders.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) --
[1/2] incubator-beam git commit: Fix comment.
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 561090580 -> 062af66f8 Fix comment. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/fdab1f94 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/fdab1f94 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/fdab1f94 Branch: refs/heads/python-sdk Commit: fdab1f9458eb8ad8b0210b8ccc54f5a44960f3bc Parents: 5610905 Author: Robert BradshawAuthored: Mon Jul 18 23:40:25 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 23:40:25 2016 -0700 -- sdks/python/apache_beam/coders/coders.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/fdab1f94/sdks/python/apache_beam/coders/coders.py -- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index 1c5043c..a5ed7f9 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -384,9 +384,9 @@ class FastPrimitivesCoder(FastCoder): return value - # We allow .key_coder() and .value_coder() to be called on PickleCoder since - # we can't always infer the return values of lambdas in ParDo operations, the - # result of which may be used in a GroupBykey. + # We allow .key_coder() and .value_coder() to be called on FastPrimitivesCoder + # since we can't always infer the return values of lambdas in ParDo + # operations, the result of which may be used in a GroupBykey. def is_kv_coder(self): return True
[2/2] incubator-beam git commit: Remove pipeline.apply(pvalue, callable)
Remove pipeline.apply(pvalue, callable) Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e68eb05e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e68eb05e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e68eb05e Branch: refs/heads/python-sdk Commit: e68eb05e5ffa366f83478807bbac3af83ea8cda5 Parents: 5daab7f Author: Robert BradshawAuthored: Mon Jul 18 10:23:24 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:46:16 2016 -0700 -- sdks/python/apache_beam/dataflow_test.py| 5 +++-- sdks/python/apache_beam/pipeline.py | 22 +++- sdks/python/apache_beam/pipeline_test.py| 18 .../python/apache_beam/transforms/ptransform.py | 2 ++ 4 files changed, 8 insertions(+), 39 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/dataflow_test.py -- diff --git a/sdks/python/apache_beam/dataflow_test.py b/sdks/python/apache_beam/dataflow_test.py index c4933af..9bbb5ff 100644 --- a/sdks/python/apache_beam/dataflow_test.py +++ b/sdks/python/apache_beam/dataflow_test.py @@ -23,6 +23,7 @@ import logging import re import unittest +import apache_beam as beam from apache_beam.pipeline import Pipeline from apache_beam.pvalue import AsDict from apache_beam.pvalue import AsIter as AllOf @@ -51,8 +52,8 @@ class DataflowTest(unittest.TestCase): # TODO(silviuc): Figure out a nice way to specify labels for stages so that # internal steps get prepended with surorunding stage names. - @staticmethod - def Count(pcoll): # pylint: disable=invalid-name + @beam.ptransform_fn + def Count(pcoll): # pylint: disable=invalid-name, no-self-argument """A Count transform: v, ... => (v, n), ...""" return (pcoll | Map('AddCount', lambda x: (x, 1)) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline.py -- diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index 012d4d9..bc1feb2 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -47,7 +47,6 @@ import logging import os import shutil import tempfile -import types from apache_beam import pvalue from apache_beam import typehints @@ -187,20 +186,17 @@ class Pipeline(object): """Applies a custom transform using the pvalueish specified. Args: - transform: the PTranform (or callable) to apply. + transform: the PTranform to apply. pvalueish: the input for the PTransform (typically a PCollection). Raises: TypeError: if the transform object extracted from the argument list is -not a callable type or a descendant from PTransform. +not a PTransform. RuntimeError: if the transform object was already applied to this pipeline and needs to be cloned in order to apply again. """ if not isinstance(transform, ptransform.PTransform): - if isinstance(transform, (type, types.ClassType)): -raise TypeError("%s is not a PTransform instance, did you mean %s()?" -% (transform, transform.__name__)) - transform = _CallableWrapperPTransform(transform) + raise TypeError("Expected a PTransform object, got %s" % transform) full_label = format_full_label(self._current_transform(), transform) if full_label in self.applied_labels: @@ -286,18 +282,6 @@ class Pipeline(object): return pvalueish_result -class _CallableWrapperPTransform(ptransform.PTransform): - - def __init__(self, callee): -assert callable(callee) -super(_CallableWrapperPTransform, self).__init__( -label=getattr(callee, '__name__', 'Callable')) -self._callee = callee - - def apply(self, *args, **kwargs): -return self._callee(*args, **kwargs) - - class PipelineVisitor(object): """Visitor pattern class used to traverse a DAG of transforms. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e68eb05e/sdks/python/apache_beam/pipeline_test.py -- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 8598737..04cd2ee 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -32,7 +32,6 @@ from apache_beam.transforms import Create from apache_beam.transforms import FlatMap from apache_beam.transforms import Flatten from apache_beam.transforms import Map -from apache_beam.transforms import
[1/2] incubator-beam git commit: Add >> operator for labeling PTransforms.
cm.exception.message, 'Transform "CustomTransform" does not have a stable unique label. ' 'This will prevent updating of pipelines. ' -'To clone a transform with a new label use: ' -'transform.clone("NEW LABEL").') +'To apply a transform with a specified label write ' +'pvalue | "label" >> transform') def test_reuse_cloned_custom_transform_instance(self): pipeline = Pipeline(self.runner_name) -pcoll1 = pipeline | Create('pcoll1', [1, 2, 3]) -pcoll2 = pipeline | Create('pcoll2', [4, 5, 6]) +pcoll1 = pipeline | 'pc1' >> Create([1, 2, 3]) +pcoll2 = pipeline | 'pc2' >> Create([4, 5, 6]) transform = PipelineTest.CustomTransform() result1 = pcoll1 | transform -result2 = pcoll2 | transform.clone('new label') +result2 = pcoll2 | 'new_label' >> transform assert_that(result1, equal_to([2, 3, 4]), label='r1') assert_that(result2, equal_to([5, 6, 7]), label='r2') pipeline.run() http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a740f827/sdks/python/apache_beam/pvalue.py -- diff --git a/sdks/python/apache_beam/pvalue.py b/sdks/python/apache_beam/pvalue.py index 6fc3041..063d0b5 100644 --- a/sdks/python/apache_beam/pvalue.py +++ b/sdks/python/apache_beam/pvalue.py @@ -79,10 +79,8 @@ class PValue(object): optional first label and a transform/callable object. It will call the pipeline.apply() method with this modified argument list. """ -if isinstance(args[0], str): - # TODO(robertwb): Make sure labels are properly passed during - # ptransform construction and drop this argument. - args = args[1:] +if isinstance(args[0], basestring): + kwargs['label'], args = args[0], args[1:] arglist = list(args) arglist.insert(1, self) return self.pipeline.apply(*arglist, **kwargs) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a740f827/sdks/python/apache_beam/transforms/ptransform.py -- diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py index 1457bec..bde05b5 100644 --- a/sdks/python/apache_beam/transforms/ptransform.py +++ b/sdks/python/apache_beam/transforms/ptransform.py @@ -369,6 +369,9 @@ class PTransform(WithTypeHints): # TODO(robertwb): Assert all input WindowFns compatible. return inputs[0].windowing + def __rrshift__(self, label): +return _NamedPTransform(self, label) + def __or__(self, right): """Used to compose PTransforms, e.g., ptransform1 | ptransform2.""" if isinstance(right, PTransform): @@ -676,24 +679,6 @@ def ptransform_fn(fn): return CallablePTransform(fn) -def format_full_label(applied_transform, pending_transform): - """Returns a fully formatted cumulative PTransform label. - - Args: -applied_transform: An instance of an AppliedPTransform that has been fully - applied prior to 'pending_transform'. -pending_transform: An instance of PTransform that has yet to be applied to - the Pipeline. - - Returns: -A fully formatted PTransform label. Example: '/foo/bar/baz'. - """ - label = '/'.join([applied_transform.full_label, pending_transform.label]) - # Remove leading backslash because the monitoring UI expects names that do not - # start with such a character. - return label if not label.startswith('/') else label[1:] - - def label_from_callable(fn): if hasattr(fn, 'default_label'): return fn.default_label() @@ -706,3 +691,13 @@ def label_from_callable(fn): return fn.__name__ else: return str(fn) + + +class _NamedPTransform(PTransform): + + def __init__(self, transform, label): +super(_NamedPTransform, self).__init__(label) +self.transform = transform + + def apply(self, pvalue): +raise RuntimeError("Should never be applied directly.")
[GitHub] incubator-beam pull request #679: Add >> operator for labeling PTransforms.
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/679 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #676: Remove pipeline.apply(pvalue, callable)
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/676 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Closes #676
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 5daab7fb3 -> ad7c216f4 Closes #676 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/ad7c216f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/ad7c216f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/ad7c216f Branch: refs/heads/python-sdk Commit: ad7c216f4949cb098ab7eacaefc68c22f6a16bad Parents: 5daab7f e68eb05 Author: Robert BradshawAuthored: Mon Jul 18 17:46:16 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:46:16 2016 -0700 -- sdks/python/apache_beam/dataflow_test.py| 5 +++-- sdks/python/apache_beam/pipeline.py | 22 +++- sdks/python/apache_beam/pipeline_test.py| 18 .../python/apache_beam/transforms/ptransform.py | 2 ++ 4 files changed, 8 insertions(+), 39 deletions(-) --
[2/2] incubator-beam git commit: Fixing broken example tests
Fixing broken example tests Nose does not pick up decorated tests unless the decorator name starts with test_. This resulted in some test being inadvertently disabled. Also OptionsContext (https://github.com/aaltay/incubator-beam/blob/python-sdk/sdks/python/apache_beam/utils/options.py#L457) is not doing what it is supposed to do. Its augment_options() method is not called therefore using OptionsContext does not override the option values as expected. I will remove this class and a few uses of it in tests, in a follow up. Finally removed contains_in_any_order in favor of equal_to. (As discuessed in: https://github.com/apache/incubator-beam/pull/650#discussion_r70906340 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3bfd6813 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3bfd6813 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3bfd6813 Branch: refs/heads/python-sdk Commit: 3bfd6813960eb8b6161bac79b96349e4c721bfea Parents: ad7c216 Author: Ahmet AltayAuthored: Mon Jul 18 14:24:51 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:47:53 2016 -0700 -- .../examples/complete/autocomplete_test.py | 4 +- .../examples/complete/estimate_pi_test.py | 1 + .../examples/cookbook/group_with_coder_test.py | 70 sdks/python/apache_beam/transforms/util.py | 13 4 files changed, 45 insertions(+), 43 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bfd6813/sdks/python/apache_beam/examples/complete/autocomplete_test.py -- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 1b3ee5f..84f947b 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -22,7 +22,7 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import autocomplete from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import contains_in_any_order +from apache_beam.transforms.util import equal_to class AutocompleteTest(unittest.TestCase): @@ -35,7 +35,7 @@ class AutocompleteTest(unittest.TestCase): result = words | autocomplete.TopPerPrefix('test', 5) # values must be hashable for now result = result | beam.Map(lambda (k, vs): (k, tuple(vs))) -assert_that(result, contains_in_any_order( +assert_that(result, equal_to( [ ('t', ((3, 'to'), (2, 'this'), (1, 'that'))), ('to', ((3, 'to'), )), http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bfd6813/sdks/python/apache_beam/examples/complete/estimate_pi_test.py -- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index 7ca82d7..c633bb1 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -17,6 +17,7 @@ """Test for the estimate_pi example.""" +import logging import unittest import apache_beam as beam http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3bfd6813/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py -- diff --git a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py index 07211a9..fb52809 100644 --- a/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py +++ b/sdks/python/apache_beam/examples/cookbook/group_with_coder_test.py @@ -18,13 +18,10 @@ """Test for the custom coders example.""" import logging +import tempfile import unittest -import apache_beam as beam from apache_beam.examples.cookbook import group_with_coder -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to -from apache_beam.utils.options import OptionsContext # Patch group_with_coder.PlayerCoder.decode(). To test that the PlayerCoder was @@ -39,37 +36,54 @@ class GroupWithCoderTest(unittest.TestCase): 'joe,20', 'fred,6', 'ann,5', 'joe,30', 'ann,10', 'mary,1'] - @OptionsContext(pipeline_type_check=True) - def test_basics_with_type_check_n(self): -# Run the workflow with pipeline_type_check option. This will make sure + def create_temp_file(self, records): +with tempfile.NamedTemporaryFile(delete=False) as f: + for record in records: +
[1/2] incubator-beam git commit: Closes #685
Repository: incubator-beam Updated Branches: refs/heads/python-sdk ad7c216f4 -> 69f895a2e Closes #685 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/69f895a2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/69f895a2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/69f895a2 Branch: refs/heads/python-sdk Commit: 69f895a2e7f5a4ef39c060cfb0320c87e64ea8be Parents: ad7c216 3bfd681 Author: Robert BradshawAuthored: Mon Jul 18 17:47:53 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:47:53 2016 -0700 -- .../examples/complete/autocomplete_test.py | 4 +- .../examples/complete/estimate_pi_test.py | 1 + .../examples/cookbook/group_with_coder_test.py | 70 sdks/python/apache_beam/transforms/util.py | 13 4 files changed, 45 insertions(+), 43 deletions(-) --
[2/2] incubator-beam git commit: Closes #679
Closes #679 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/56109058 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/56109058 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/56109058 Branch: refs/heads/python-sdk Commit: 5610905803e03bafd575daf340c8e38113bb78cd Parents: b00f915 a740f82 Author: Robert BradshawAuthored: Mon Jul 18 17:59:00 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:59:00 2016 -0700 -- sdks/python/apache_beam/dataflow_test.py| 2 -- sdks/python/apache_beam/pipeline.py | 14 + sdks/python/apache_beam/pipeline_test.py| 10 +++ sdks/python/apache_beam/pvalue.py | 6 ++-- .../python/apache_beam/transforms/ptransform.py | 31 5 files changed, 29 insertions(+), 34 deletions(-) --
[4/4] incubator-beam git commit: Implement coder optimized for coding primitives.
ders.py b/sdks/python/apache_beam/coders/coders.py index 619586f..cf5ca6d 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -354,6 +354,49 @@ class DeterministicPickleCoder(FastCoder): return self +class FastPrimitivesCoder(FastCoder): + """Encodes simple primitives (e.g. str, int) efficiently. + + For unknown types, falls back to another coder (e.g. PickleCoder). + """ + def __init__(self, fallback_coder): +self._fallback_coder = fallback_coder + + def _create_impl(self): +return coder_impl.FastPrimitivesCoderImpl( +self._fallback_coder.get_impl()) + + def is_deterministic(self): +return self._fallback_coder.is_deterministic() + + def as_cloud_object(self, is_pair_like=True): +value = super(FastCoder, self).as_cloud_object() +# We currently use this coder in places where we cannot infer the coder to +# use for the value type in a more granular way. In places where the +# service expects a pair, it checks for the "is_pair_like" key, in which +# case we would fail without the hack below. +if is_pair_like: + value['is_pair_like'] = True + value['component_encodings'] = [ + self.as_cloud_object(is_pair_like=False), + self.as_cloud_object(is_pair_like=False) + ] + +return value + + # We allow .key_coder() and .value_coder() to be called on PickleCoder since + # we can't always infer the return values of lambdas in ParDo operations, the + # result of which may be used in a GroupBykey. + def is_kv_coder(self): +return True + + def key_coder(self): +return self + + def value_coder(self): +return self + + class Base64PickleCoder(Coder): """Coder of objects by Python pickle, then base64 encoding.""" # TODO(robertwb): Do base64 encoding where it's needed (e.g. in json) rather http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f58c2bda/sdks/python/apache_beam/coders/coders_test_common.py -- diff --git a/sdks/python/apache_beam/coders/coders_test_common.py b/sdks/python/apache_beam/coders/coders_test_common.py index 0266fdc..b084947 100644 --- a/sdks/python/apache_beam/coders/coders_test_common.py +++ b/sdks/python/apache_beam/coders/coders_test_common.py @@ -111,6 +111,14 @@ class CodersTest(unittest.TestCase): coders.TupleCoder((coders.VarIntCoder(), coders.DillCoder())), (1, cell_value)) + def test_fast_primitives_coder(self): +coder = coders.FastPrimitivesCoder(coders.SingletonCoder(len)) +self.check_coder(coder, None, 1, -1, 1.5, 'str\0str', u'unicode\0\u0101') +self.check_coder(coder, (), (1, 2, 3)) +self.check_coder(coder, [], [1, 2, 3]) +self.check_coder(coder, len) +self.check_coder(coders.TupleCoder((coder,)), ('a',), (1,)) + def test_bytes_coder(self): self.check_coder(coders.BytesCoder(), 'a', '\0', 'z' * 1000)
[1/4] incubator-beam git commit: Closes #683
Repository: incubator-beam Updated Branches: refs/heads/python-sdk 69f895a2e -> b00f915ee Closes #683 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/b00f915e Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/b00f915e Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/b00f915e Branch: refs/heads/python-sdk Commit: b00f915eeeb5bca9addda2a9a5ff500e81954d80 Parents: 69f895a 4c51273 Author: Robert BradshawAuthored: Mon Jul 18 17:48:33 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:48:33 2016 -0700 -- sdks/python/apache_beam/coders/coder_impl.pxd | 10 +++ sdks/python/apache_beam/coders/coder_impl.py| 80 sdks/python/apache_beam/coders/coders.py| 43 +++ .../apache_beam/coders/coders_test_common.py| 9 +++ sdks/python/apache_beam/coders/typecoders.py| 4 +- 5 files changed, 144 insertions(+), 2 deletions(-) --
[2/4] incubator-beam git commit: Used fast primitives coder as fallback coder.
Used fast primitives coder as fallback coder. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64457d04 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64457d04 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64457d04 Branch: refs/heads/python-sdk Commit: 64457d04f60859c7f0ef53c9d847090a9fa754e4 Parents: f58c2bd Author: Robert BradshawAuthored: Mon Jul 18 14:25:08 2016 -0700 Committer: Robert Bradshaw Committed: Mon Jul 18 17:48:33 2016 -0700 -- sdks/python/apache_beam/coders/coders.py | 2 +- sdks/python/apache_beam/coders/typecoders.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/coders.py -- diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py index cf5ca6d..1c5043c 100644 --- a/sdks/python/apache_beam/coders/coders.py +++ b/sdks/python/apache_beam/coders/coders.py @@ -359,7 +359,7 @@ class FastPrimitivesCoder(FastCoder): For unknown types, falls back to another coder (e.g. PickleCoder). """ - def __init__(self, fallback_coder): + def __init__(self, fallback_coder=PickleCoder()): self._fallback_coder = fallback_coder def _create_impl(self): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64457d04/sdks/python/apache_beam/coders/typecoders.py -- diff --git a/sdks/python/apache_beam/coders/typecoders.py b/sdks/python/apache_beam/coders/typecoders.py index 2fba752..74e5770 100644 --- a/sdks/python/apache_beam/coders/typecoders.py +++ b/sdks/python/apache_beam/coders/typecoders.py @@ -86,9 +86,9 @@ class CoderRegistry(object): self._register_coder_internal(bytes, coders.BytesCoder) self._register_coder_internal(unicode, coders.StrUtf8Coder) self._register_coder_internal(typehints.TupleConstraint, coders.TupleCoder) +self._fallback_coder = fallback_coder or coders.FastPrimitivesCoder self._register_coder_internal(typehints.AnyTypeConstraint, - coders.PickleCoder) -self._fallback_coder = fallback_coder or coders.PickleCoder + self._fallback_coder) def _register_coder_internal(self, typehint_type, typehint_coder_class): self._coders[typehint_type] = typehint_coder_class
[GitHub] incubator-beam pull request #691: Add low-level data structures to be shared...
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/691 Add low-level data structures to be shared with the worker. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam cdef-structs Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/691.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #691 commit 3653a2b378d848fdc557e008111769f52cad1a23 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-19T19:19:47Z Cythonize WindowedValue class. commit ec33ed4b71ca94faa59e399c4a701df6a11e21d6 Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-19T19:22:08Z Add Cython DoFnContext and Receiver stubs. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] incubator-beam pull request #696: Cythonize WindowedValue class.
GitHub user robertwb opened a pull request: https://github.com/apache/incubator-beam/pull/696 Cythonize WindowedValue class. Be sure to do all of the following to help us incorporate your contribution quickly and easily: - [ ] Make sure the PR title is formatted like: `[BEAM-] Description of pull request` - [ ] Make sure tests pass via `mvn clean verify`. (Even better, enable Travis-CI on your fork and ensure the whole test matrix passes). - [ ] Replace `` in the title with the actual Jira issue number, if there is one. - [ ] If this contribution is large, please file an Apache [Individual Contributor License Agreement](https://www.apache.org/licenses/icla.txt). --- You can merge this pull request into a Git repository by running: $ git pull https://github.com/robertwb/incubator-beam cdef-structs-wv-only Alternatively you can review and apply these changes as the patch at: https://github.com/apache/incubator-beam/pull/696.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #696 commit 862e0650985b4a6436abf49197710cb87037111a Author: Robert Bradshaw <rober...@google.com> Date: 2016-07-19T19:19:47Z Cythonize WindowedValue class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[1/2] incubator-beam git commit: Closes #599
Repository: incubator-beam Updated Branches: refs/heads/python-sdk d898d56ae -> 1305c108a Closes #599 Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/1305c108 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/1305c108 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/1305c108 Branch: refs/heads/python-sdk Commit: 1305c108a63cb1675fd9bc36fa990bb87033eaeb Parents: d898d56 c9c31fd Author: Robert BradshawAuthored: Fri Jul 15 10:23:13 2016 -0700 Committer: Robert Bradshaw Committed: Fri Jul 15 10:23:13 2016 -0700 -- sdks/python/apache_beam/io/filebasedsource.py | 9 + sdks/python/apache_beam/io/fileio.py | 14 ++ sdks/python/apache_beam/io/gcsio.py | 15 +++ sdks/python/apache_beam/io/gcsio_test.py | 8 sdks/python/apache_beam/io/range_trackers.py | 3 ++- sdks/python/apache_beam/io/range_trackers_test.py | 3 +++ 6 files changed, 43 insertions(+), 9 deletions(-) --
[2/2] incubator-beam git commit: Fixes several issues related to 'filebasedsource'.
Fixes several issues related to 'filebasedsource'. Adds a method 'fileio.ChannelFactory.size_in_bytes()' that can be used to determine the size of a single file. Implements this method for 'ChannelFactory' implementations for GCS and local files. Updates 'filebasedsource' to use this method when determining size of files. Fixes a small bug in 'OffsetRangeTracker'. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9c31fdf Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9c31fdf Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9c31fdf Branch: refs/heads/python-sdk Commit: c9c31fdffbe2365a8dedd3154726ab1c01cfa889 Parents: d898d56 Author: Chamikara JayalathAuthored: Wed Jul 6 20:25:04 2016 -0700 Committer: Robert Bradshaw Committed: Fri Jul 15 10:23:13 2016 -0700 -- sdks/python/apache_beam/io/filebasedsource.py | 9 + sdks/python/apache_beam/io/fileio.py | 14 ++ sdks/python/apache_beam/io/gcsio.py | 15 +++ sdks/python/apache_beam/io/gcsio_test.py | 8 sdks/python/apache_beam/io/range_trackers.py | 3 ++- sdks/python/apache_beam/io/range_trackers_test.py | 3 +++ 6 files changed, 43 insertions(+), 9 deletions(-) -- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/filebasedsource.py -- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index c877e44..aa0820d 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -26,7 +26,6 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``. """ from multiprocessing.pool import ThreadPool -import os import range_trackers from apache_beam.io import fileio @@ -131,13 +130,7 @@ class FileBasedSource(iobase.BoundedSource): def _estimate_sizes_in_parallel(file_names): def _calculate_size_of_file(file_name): - f = fileio.ChannelFactory.open( - file_name, 'rb', 'application/octet-stream') - try: -f.seek(0, os.SEEK_END) -return f.tell() - finally: -f.close() + return fileio.ChannelFactory.size_in_bytes(file_name) return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map( _calculate_size_of_file, file_names) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/fileio.py -- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 31b6a93..f532077 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -255,6 +255,20 @@ class ChannelFactory(object): else: return glob.glob(path) + @staticmethod + def size_in_bytes(path): +"""Returns the size of a file in bytes. + +Args: + path: a string that gives the path of a single file. +""" +if path.startswith('gs://'): + # pylint: disable=wrong-import-order, wrong-import-position + from apache_beam.io import gcsio + return gcsio.GcsIO().size(path) +else: + return os.path.getsize(path) + class _CompressionType(object): """Object representing single compression type.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio.py -- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index c61f251..10409c9 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -236,6 +236,21 @@ class GcsIO(object): except IOError: return False + @retry.with_exponential_backoff( + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def size(self, path): +"""Returns the size of a single GCS object. + +This method does not perform glob expansion. Hence the given path must be +for a single GCS object. + +Returns: size of the GCS object in bytes. +""" +bucket, object_path = parse_gcs_path(path) +request = storage.StorageObjectsGetRequest(bucket=bucket, + object=object_path) +return self.client.objects.Get(request).size + class GcsBufferedReader(object): """A class for reading Google Cloud Storage files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio_test.py
[GitHub] incubator-beam pull request #657: Accept runners by fully qualified name.
Github user robertwb closed the pull request at: https://github.com/apache/incubator-beam/pull/657 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---