Cleanup and fix combiners_test.

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01830510
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01830510
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01830510

Branch: refs/heads/python-sdk
Commit: 01830510bded3c22ddd96937f5d83547702a4385
Parents: 7c186ce
Author: Robert Bradshaw <rober...@google.com>
Authored: Fri Jul 22 16:05:23 2016 -0700
Committer: Robert Bradshaw <rober...@gmail.com>
Committed: Sat Jul 23 16:43:46 2016 -0700

----------------------------------------------------------------------
 .../apache_beam/transforms/combiners_test.py    | 20 +++++++++-----------
 1 file changed, 9 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01830510/sdks/python/apache_beam/transforms/combiners_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/combiners_test.py 
b/sdks/python/apache_beam/transforms/combiners_test.py
index 0439fe1..c970382 100644
--- a/sdks/python/apache_beam/transforms/combiners_test.py
+++ b/sdks/python/apache_beam/transforms/combiners_test.py
@@ -79,12 +79,11 @@ class CombineTest(unittest.TestCase):
     assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp')
 
     # Again for per-key combines.
-    pcoll = pipeline | Create(
-        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+    pcoll = pipeline | 'start-perkye' >> Create(
+        [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
     result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5)
     result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4)
-    result_key_cmp = pcoll | combine.Top.PerKey(
-        'cmp-perkey',
+    result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey(
         6,
         lambda a, b, names: len(names[a]) < len(names[b]),
         names)  # Note parameter passed to comparator.
@@ -105,11 +104,11 @@ class CombineTest(unittest.TestCase):
     assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top')
     assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot')
 
-    pcoll = pipeline | Create(
-        'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
+    pcoll = pipeline | 'start-perkey' >> Create(
+        [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]])
     result_ktop = pcoll | 'top-perkey' >> 
beam.CombinePerKey(combine.Largest(5))
-    result_kbot = pcoll | beam.CombinePerKey(
-        'bot-perkey', combine.Smallest(4))
+    result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey(
+        combine.Smallest(4))
     assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top')
     assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot')
     pipeline.run()
@@ -138,8 +137,7 @@ class CombineTest(unittest.TestCase):
 
     # Now test per-key samples.
     pipeline = Pipeline('DirectPipelineRunner')
-    pcoll = pipeline | Create(
-        'start-perkey',
+    pcoll = pipeline | 'start-perkey' >> Create(
         sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), []))
     result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3)
 
@@ -158,7 +156,7 @@ class CombineTest(unittest.TestCase):
     p = Pipeline('DirectPipelineRunner')
     result = (
         p
-        | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)])
+        | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)])
         | beam.CombineGlobally(combine.TupleCombineFn(max,
                                                       combine.MeanCombineFn(),
                                                       sum)).without_defaults())

Reply via email to