Repository: beam
Updated Branches:
  refs/heads/master 8f9f5f1a6 -> ffd99cb43


Add template examples to snippets.py


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/fdf7e330
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/fdf7e330
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/fdf7e330

Branch: refs/heads/master
Commit: fdf7e330743d7e62a25f9e7d2d5ea219843a87f1
Parents: 8f9f5f1
Author: Maria Garcia Herrero <mari...@google.com>
Authored: Thu May 25 18:21:46 2017 -0700
Committer: Ahmet Altay <al...@google.com>
Committed: Thu Jun 1 09:29:56 2017 -0700

----------------------------------------------------------------------
 .../apache_beam/examples/snippets/snippets.py   | 86 ++++++++++++++++++++
 .../examples/snippets/snippets_test.py          | 14 +++-
 2 files changed, 99 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/fdf7e330/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py 
b/sdks/python/apache_beam/examples/snippets/snippets.py
index 70929e9..3a5f9b1 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -502,6 +502,51 @@ def examples_wordcount_wordcount(renames):
     p.visit(SnippetUtils.RenameFiles(renames))
 
 
+def examples_wordcount_templated(renames):
+  """Templated WordCount example snippet."""
+  import re
+
+  import apache_beam as beam
+  from apache_beam.io import ReadFromText
+  from apache_beam.io import WriteToText
+  from apache_beam.options.pipeline_options import PipelineOptions
+
+  # [START example_wordcount_templated]
+  class WordcountTemplatedOptions(PipelineOptions):
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      # Use add_value_provider_argument for arguments to be templatable
+      # Use add_argument as usual for non-templatable arguments
+      parser.add_value_provider_argument(
+          '--input',
+          help='Path of the file to read from')
+      parser.add_argument(
+          '--output',
+          required=True,
+          help='Output file to write results to.')
+  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
+  p = beam.Pipeline(options=pipeline_options)
+
+  wordcount_options = pipeline_options.view_as(WordcountTemplatedOptions)
+  lines = p | 'Read' >> ReadFromText(wordcount_options.input)
+  # [END example_wordcount_templated]
+
+  (
+      lines
+      | 'ExtractWords' >> beam.FlatMap(
+          lambda x: re.findall(r'[A-Za-z\']+', x))
+      | 'PairWithOnes' >> beam.Map(lambda x: (x, 1))
+      | 'Group' >> beam.GroupByKey()
+      | 'Sum' >> beam.Map(lambda (word, ones): (word, sum(ones)))
+      | 'Format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
+      | 'Write' >> WriteToText(wordcount_options.output)
+  )
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+  result = p.run()
+  result.wait_until_finish()
+
+
 def examples_wordcount_debugging(renames):
   """DebuggingWordCount example snippets."""
   import re
@@ -569,6 +614,47 @@ def examples_wordcount_debugging(renames):
     p.visit(SnippetUtils.RenameFiles(renames))
 
 
+def examples_ptransforms_templated(renames):
+  # [START examples_ptransforms_templated]
+  import apache_beam as beam
+  from apache_beam.io import WriteToText
+  from apache_beam.options.pipeline_options import PipelineOptions
+  from apache_beam.options.value_provider import StaticValueProvider
+
+  class TemplatedUserOptions(PipelineOptions):
+    @classmethod
+    def _add_argparse_args(cls, parser):
+      parser.add_value_provider_argument('--templated_int', type=int)
+
+  class MySumFn(beam.DoFn):
+    def __init__(self, templated_int):
+      self.templated_int = templated_int
+
+    def process(self, an_int):
+      yield self.templated_int.get() + an_int
+
+  pipeline_options = PipelineOptions()
+  p = beam.Pipeline(options=pipeline_options)
+
+  user_options = pipeline_options.view_as(TemplatedUserOptions)
+  my_sum_fn = MySumFn(user_options.templated_int)
+  sum = (p
+         | 'ReadCollection' >> beam.io.ReadFromText(
+             'gs://some/integer_collection')
+         | 'StringToInt' >> beam.Map(lambda w: int(w))
+         | 'AddGivenInt' >> beam.ParDo(my_sum_fn)
+         | 'WriteResultingCollection' >> WriteToText('some/output_path'))
+  # [END examples_ptransforms_templated]
+
+  # Templates are not supported by DirectRunner (only by DataflowRunner)
+  # so a value must be provided at graph-construction time
+  my_sum_fn.templated_int = StaticValueProvider(int, 10)
+
+  p.visit(SnippetUtils.RenameFiles(renames))
+  result = p.run()
+  result.wait_until_finish()
+
+
 import apache_beam as beam
 from apache_beam.io import iobase
 from apache_beam.io.range_trackers import OffsetRangeTracker

http://git-wip-us.apache.org/repos/asf/beam/blob/fdf7e330/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py 
b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index e302465..9183d0d 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -636,7 +636,8 @@ class SnippetsTest(unittest.TestCase):
   def test_examples_wordcount(self):
     pipelines = [snippets.examples_wordcount_minimal,
                  snippets.examples_wordcount_wordcount,
-                 snippets.pipeline_monitoring]
+                 snippets.pipeline_monitoring,
+                 snippets.examples_wordcount_templated]
 
     for pipeline in pipelines:
       temp_path = self.create_temp_file(
@@ -647,6 +648,17 @@ class SnippetsTest(unittest.TestCase):
           self.get_output(result_path),
           ['abc: 2', 'def: 1', 'ghi: 1', 'jkl: 1'])
 
+  def test_examples_ptransforms_templated(self):
+    pipelines = [snippets.examples_ptransforms_templated]
+
+    for pipeline in pipelines:
+      temp_path = self.create_temp_file('1\n 2\n 3')
+      result_path = self.create_temp_file()
+      pipeline({'read': temp_path, 'write': result_path})
+      self.assertEqual(
+          self.get_output(result_path),
+          ['11', '12', '13'])
+
   def test_examples_wordcount_debugging(self):
     temp_path = self.create_temp_file(
         'Flourish Flourish Flourish stomach abc def')

Reply via email to