Cleanup and fix combiners_test.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/01830510 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/01830510 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/01830510 Branch: refs/heads/python-sdk Commit: 01830510bded3c22ddd96937f5d83547702a4385 Parents: 7c186ce Author: Robert Bradshaw <rober...@google.com> Authored: Fri Jul 22 16:05:23 2016 -0700 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Sat Jul 23 16:43:46 2016 -0700 ---------------------------------------------------------------------- .../apache_beam/transforms/combiners_test.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/01830510/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 0439fe1..c970382 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -79,12 +79,11 @@ class CombineTest(unittest.TestCase): assert_that(result_cmp, equal_to([[9, 6, 6, 5, 3, 2]]), label='assert:cmp') # Again for per-key combines. - pcoll = pipeline | Create( - 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) + pcoll = pipeline | 'start-perkye' >> Create( + [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) result_key_top = pcoll | 'top-perkey' >> combine.Top.LargestPerKey(5) result_key_bot = pcoll | 'bot-perkey' >> combine.Top.SmallestPerKey(4) - result_key_cmp = pcoll | combine.Top.PerKey( - 'cmp-perkey', + result_key_cmp = pcoll | 'cmp-perkey' >> combine.Top.PerKey( 6, lambda a, b, names: len(names[a]) < len(names[b]), names) # Note parameter passed to comparator. @@ -105,11 +104,11 @@ class CombineTest(unittest.TestCase): assert_that(result_top, equal_to([[9, 6, 6, 5, 3]]), label='assert:top') assert_that(result_bot, equal_to([[0, 1, 1, 1]]), label='assert:bot') - pcoll = pipeline | Create( - 'start-perkey', [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) + pcoll = pipeline | 'start-perkey' >> Create( + [('a', x) for x in [6, 3, 1, 1, 9, 1, 5, 2, 0, 6]]) result_ktop = pcoll | 'top-perkey' >> beam.CombinePerKey(combine.Largest(5)) - result_kbot = pcoll | beam.CombinePerKey( - 'bot-perkey', combine.Smallest(4)) + result_kbot = pcoll | 'bot-perkey' >> beam.CombinePerKey( + combine.Smallest(4)) assert_that(result_ktop, equal_to([('a', [9, 6, 6, 5, 3])]), label='k:top') assert_that(result_kbot, equal_to([('a', [0, 1, 1, 1])]), label='k:bot') pipeline.run() @@ -138,8 +137,7 @@ class CombineTest(unittest.TestCase): # Now test per-key samples. pipeline = Pipeline('DirectPipelineRunner') - pcoll = pipeline | Create( - 'start-perkey', + pcoll = pipeline | 'start-perkey' >> Create( sum(([(i, 1), (i, 1), (i, 2), (i, 2)] for i in xrange(300)), [])) result = pcoll | 'sample' >> combine.Sample.FixedSizePerKey(3) @@ -158,7 +156,7 @@ class CombineTest(unittest.TestCase): p = Pipeline('DirectPipelineRunner') result = ( p - | 'a' >> Create([(100, 0.0), ('b', 10, -1), ('c', 1, 100)]) + | Create([('a', 100, 0.0), ('b', 10, -1), ('c', 1, 100)]) | beam.CombineGlobally(combine.TupleCombineFn(max, combine.MeanCombineFn(), sum)).without_defaults())