Repository: beam Updated Branches: refs/heads/master 8f9f5f1a6 -> ffd99cb43
Add template examples to snippets.py Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdf7e330 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdf7e330 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdf7e330 Branch: refs/heads/master Commit: fdf7e330743d7e62a25f9e7d2d5ea219843a87f1 Parents: 8f9f5f1 Author: Maria Garcia Herrero <mari...@google.com> Authored: Thu May 25 18:21:46 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Thu Jun 1 09:29:56 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/snippets/snippets.py | 86 ++++++++++++++++++++ .../examples/snippets/snippets_test.py | 14 +++- 2 files changed, 99 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/fdf7e330/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 70929e9..3a5f9b1 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -502,6 +502,51 @@ def examples_wordcount_wordcount(renames): p.visit(SnippetUtils.RenameFiles(renames)) +def examples_wordcount_templated(renames): + """Templated WordCount example snippet.""" + import re + + import apache_beam as beam + from apache_beam.io import ReadFromText + from apache_beam.io import WriteToText + from apache_beam.options.pipeline_options import PipelineOptions + + # [START example_wordcount_templated] + class WordcountTemplatedOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + # Use add_value_provider_argument for arguments to be templatable + # Use add_argument as usual for non-templatable arguments + parser.add_value_provider_argument( + '--input', + help='Path of the file to read from') + parser.add_argument( + '--output', + required=True, + help='Output file to write results to.') + pipeline_options = PipelineOptions(['--output', 'some/output_path']) + p = beam.Pipeline(options=pipeline_options) + + wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions) + lines = p | 'Read' >> ReadFromText(wordcount_options.input) + # [END example_wordcount_templated] + + ( + lines + | 'ExtractWords' >> beam.FlatMap( + lambda x: re.findall(r'[A-Za-z\']+', x)) + | 'PairWithOnes' >> beam.Map(lambda x: (x, 1)) + | 'Group' >> beam.GroupByKey() + | 'Sum' >> beam.Map(lambda (word, ones): (word, sum(ones))) + | 'Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c)) + | 'Write' >> WriteToText(wordcount_options.output) + ) + + p.visit(SnippetUtils.RenameFiles(renames)) + result = p.run() + result.wait_until_finish() + + def examples_wordcount_debugging(renames): """DebuggingWordCount example snippets.""" import re @@ -569,6 +614,47 @@ def examples_wordcount_debugging(renames): p.visit(SnippetUtils.RenameFiles(renames)) +def examples_ptransforms_templated(renames): + # [START examples_ptransforms_templated] + import apache_beam as beam + from apache_beam.io import WriteToText + from apache_beam.options.pipeline_options import PipelineOptions + from apache_beam.options.value_provider import StaticValueProvider + + class TemplatedUserOptions(PipelineOptions): + @classmethod + def _add_argparse_args(cls, parser): + parser.add_value_provider_argument('--templated_int', type=int) + + class MySumFn(beam.DoFn): + def __init__(self, templated_int): + self.templated_int = templated_int + + def process(self, an_int): + yield self.templated_int.get() + an_int + + pipeline_options = PipelineOptions() + p = beam.Pipeline(options=pipeline_options) + + user_options = pipeline_options.view_as(TemplatedUserOptions) + my_sum_fn = MySumFn(user_options.templated_int) + sum = (p + | 'ReadCollection' >> beam.io.ReadFromText( + 'gs://some/integer_collection') + | 'StringToInt' >> beam.Map(lambda w: int(w)) + | 'AddGivenInt' >> beam.ParDo(my_sum_fn) + | 'WriteResultingCollection' >> WriteToText('some/output_path')) + # [END examples_ptransforms_templated] + + # Templates are not supported by DirectRunner (only by DataflowRunner) + # so a value must be provided at graph-construction time + my_sum_fn.templated_int = StaticValueProvider(int, 10) + + p.visit(SnippetUtils.RenameFiles(renames)) + result = p.run() + result.wait_until_finish() + + import apache_beam as beam from apache_beam.io import iobase from apache_beam.io.range_trackers import OffsetRangeTracker http://git-wip-us.apache.org/repos/asf/beam/blob/fdf7e330/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index e302465..9183d0d 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -636,7 +636,8 @@ class SnippetsTest(unittest.TestCase): def test_examples_wordcount(self): pipelines = [snippets.examples_wordcount_minimal, snippets.examples_wordcount_wordcount, - snippets.pipeline_monitoring] + snippets.pipeline_monitoring, + snippets.examples_wordcount_templated] for pipeline in pipelines: temp_path = self.create_temp_file( @@ -647,6 +648,17 @@ class SnippetsTest(unittest.TestCase): self.get_output(result_path), ['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1']) + def test_examples_ptransforms_templated(self): + pipelines = [snippets.examples_ptransforms_templated] + + for pipeline in pipelines: + temp_path = self.create_temp_file('1\n 2\n 3') + result_path = self.create_temp_file() + pipeline({'read': temp_path, 'write': result_path}) + self.assertEqual( + self.get_output(result_path), + ['11', '12', '13']) + def test_examples_wordcount_debugging(self): temp_path = self.create_temp_file( 'Flourish Flourish Flourish stomach abc def')