Move assert_that, equal_to, is_empty to apache_beam.testing.util
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/2070f118 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/2070f118 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/2070f118 Branch: refs/heads/release-2.0.0 Commit: 2070f1182d49e3b7b3e9ed8a35173cb165fa5bfb Parents: d0da682 Author: Charles Chen <c...@google.com> Authored: Thu May 11 15:07:30 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Thu May 11 16:20:37 2017 -0700 ---------------------------------------------------------------------- .../examples/complete/autocomplete_test.py | 4 +- .../examples/complete/estimate_pi_test.py | 4 +- .../complete/game/hourly_team_score_test.py | 4 +- .../examples/complete/game/user_score_test.py | 4 +- .../apache_beam/examples/complete/tfidf_test.py | 4 +- .../complete/top_wikipedia_sessions_test.py | 4 +- .../cookbook/bigquery_side_input_test.py | 4 +- .../cookbook/bigquery_tornadoes_test.py | 6 +- .../examples/cookbook/coders_test.py | 4 +- .../examples/cookbook/combiners_test.py | 6 +- .../examples/cookbook/custom_ptransform_test.py | 4 +- .../examples/cookbook/filters_test.py | 12 ++- .../examples/cookbook/mergecontacts.py | 14 +-- .../apache_beam/examples/snippets/snippets.py | 17 +-- .../examples/snippets/snippets_test.py | 30 +++--- .../apache_beam/examples/wordcount_debugging.py | 6 +- sdks/python/apache_beam/io/avroio_test.py | 4 +- .../python/apache_beam/io/concat_source_test.py | 4 +- .../apache_beam/io/filebasedsource_test.py | 4 +- sdks/python/apache_beam/io/sources_test.py | 4 +- sdks/python/apache_beam/io/textio_test.py | 5 +- sdks/python/apache_beam/io/tfrecordio_test.py | 24 +++-- sdks/python/apache_beam/pipeline_test.py | 4 +- .../portability/maptask_executor_runner_test.py | 6 +- sdks/python/apache_beam/runners/runner_test.py | 4 +- sdks/python/apache_beam/testing/util.py | 107 +++++++++++++++++++ sdks/python/apache_beam/testing/util_test.py | 50 +++++++++ .../apache_beam/transforms/combiners_test.py | 2 +- .../apache_beam/transforms/create_test.py | 3 +- .../apache_beam/transforms/ptransform_test.py | 2 +- .../apache_beam/transforms/sideinputs_test.py | 2 +- .../apache_beam/transforms/trigger_test.py | 2 +- sdks/python/apache_beam/transforms/util.py | 79 -------------- sdks/python/apache_beam/transforms/util_test.py | 50 --------- .../apache_beam/transforms/window_test.py | 2 +- .../transforms/write_ptransform_test.py | 2 +- .../typehints/typed_pipeline_test.py | 2 +- 37 files changed, 271 insertions(+), 218 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/autocomplete_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/autocomplete_test.py b/sdks/python/apache_beam/examples/complete/autocomplete_test.py index 438633a..378d222 100644 --- a/sdks/python/apache_beam/examples/complete/autocomplete_test.py +++ b/sdks/python/apache_beam/examples/complete/autocomplete_test.py @@ -22,8 +22,8 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import autocomplete from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class AutocompleteTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/estimate_pi_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py index 12d8379..fd51309 100644 --- a/sdks/python/apache_beam/examples/complete/estimate_pi_test.py +++ b/sdks/python/apache_beam/examples/complete/estimate_pi_test.py @@ -22,8 +22,8 @@ import unittest from apache_beam.examples.complete import estimate_pi from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import BeamAssertException +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import BeamAssertException def in_between(lower, upper): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py index bd0abca..9c30127 100644 --- a/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py +++ b/sdks/python/apache_beam/examples/complete/game/hourly_team_score_test.py @@ -23,6 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.complete.game import hourly_team_score from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class HourlyTeamScoreTest(unittest.TestCase): @@ -44,7 +46,7 @@ class HourlyTeamScoreTest(unittest.TestCase): start_min='2015-11-16-15-20', stop_min='2015-11-16-17-20', window_duration=60)) - beam.assert_that(result, beam.equal_to([ + assert_that(result, equal_to([ ('team1', 18), ('team2', 2), ('team3', 13)])) http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/game/user_score_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/game/user_score_test.py b/sdks/python/apache_beam/examples/complete/game/user_score_test.py index 2db53bd..59903d9 100644 --- a/sdks/python/apache_beam/examples/complete/game/user_score_test.py +++ b/sdks/python/apache_beam/examples/complete/game/user_score_test.py @@ -23,6 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.complete.game import user_score from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class UserScoreTest(unittest.TestCase): @@ -40,7 +42,7 @@ class UserScoreTest(unittest.TestCase): with TestPipeline() as p: result = ( p | beam.Create(UserScoreTest.SAMPLE_DATA) | user_score.UserScore()) - beam.assert_that(result, beam.equal_to([ + assert_that(result, equal_to([ ('user1_team1', 50), ('user2_team2', 2), ('user3_team3', 8), ('user4_team3', 5)])) http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/tfidf_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/tfidf_test.py b/sdks/python/apache_beam/examples/complete/tfidf_test.py index 0e30254..f177dfc 100644 --- a/sdks/python/apache_beam/examples/complete/tfidf_test.py +++ b/sdks/python/apache_beam/examples/complete/tfidf_test.py @@ -26,6 +26,8 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import tfidf from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to EXPECTED_RESULTS = set([ @@ -57,7 +59,7 @@ class TfIdfTest(unittest.TestCase): uri_to_line | tfidf.TfIdf() | beam.Map(lambda (word, (uri, tfidf)): (word, uri, tfidf))) - beam.assert_that(result, beam.equal_to(EXPECTED_RESULTS)) + assert_that(result, equal_to(EXPECTED_RESULTS)) # Run the pipeline. Note that the assert_that above adds to the pipeline # a check that the result PCollection contains expected values. To actually # trigger the check the pipeline must be run. http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py index 4850c04..5fb6276 100644 --- a/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py +++ b/sdks/python/apache_beam/examples/complete/top_wikipedia_sessions_test.py @@ -24,6 +24,8 @@ import unittest import apache_beam as beam from apache_beam.examples.complete import top_wikipedia_sessions from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class ComputeTopSessionsTest(unittest.TestCase): @@ -54,7 +56,7 @@ class ComputeTopSessionsTest(unittest.TestCase): edits = p | beam.Create(self.EDITS) result = edits | top_wikipedia_sessions.ComputeTopSessions(1.0) - beam.assert_that(result, beam.equal_to(self.EXPECTED)) + assert_that(result, equal_to(self.EXPECTED)) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py index 1ca25c9..b11dc47 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py @@ -23,6 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import bigquery_side_input from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class BigQuerySideInputTest(unittest.TestCase): @@ -42,7 +44,7 @@ class BigQuerySideInputTest(unittest.TestCase): words_pcoll, ignore_corpus_pcoll, ignore_word_pcoll) - beam.assert_that(groups, beam.equal_to( + assert_that(groups, equal_to( [('A', 'corpus2', 'word2'), ('B', 'corpus2', 'word2'), ('C', 'corpus2', 'word2')])) http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py index ca7ca9e..c926df8 100644 --- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py +++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes_test.py @@ -23,6 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import bigquery_tornadoes from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class BigQueryTornadoesTest(unittest.TestCase): @@ -35,8 +37,8 @@ class BigQueryTornadoesTest(unittest.TestCase): {'month': 1, 'day': 3, 'tornado': True}, {'month': 2, 'day': 1, 'tornado': True}])) results = bigquery_tornadoes.count_tornadoes(rows) - beam.assert_that(results, beam.equal_to([{'month': 1, 'tornado_count': 2}, - {'month': 2, 'tornado_count': 1}])) + assert_that(results, equal_to([{'month': 1, 'tornado_count': 2}, + {'month': 2, 'tornado_count': 1}])) p.run().wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/coders_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/coders_test.py b/sdks/python/apache_beam/examples/cookbook/coders_test.py index 35cf252..f71dad8 100644 --- a/sdks/python/apache_beam/examples/cookbook/coders_test.py +++ b/sdks/python/apache_beam/examples/cookbook/coders_test.py @@ -23,8 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import coders from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class CodersTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/combiners_test.py b/sdks/python/apache_beam/examples/cookbook/combiners_test.py index 45c779f..ee1fb77 100644 --- a/sdks/python/apache_beam/examples/cookbook/combiners_test.py +++ b/sdks/python/apache_beam/examples/cookbook/combiners_test.py @@ -28,6 +28,8 @@ import unittest import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class CombinersTest(unittest.TestCase): @@ -49,7 +51,7 @@ class CombinersTest(unittest.TestCase): | beam.Create(CombinersTest.SAMPLE_DATA) | beam.CombinePerKey(sum)) - beam.assert_that(result, beam.equal_to([('a', 6), ('b', 30), ('c', 100)])) + assert_that(result, equal_to([('a', 6), ('b', 30), ('c', 100)])) result.pipeline.run() def test_combine_per_key_with_custom_callable(self): @@ -65,7 +67,7 @@ class CombinersTest(unittest.TestCase): | beam.Create(CombinersTest.SAMPLE_DATA) | beam.CombinePerKey(multiply)) - beam.assert_that(result, beam.equal_to([('a', 6), ('b', 200), ('c', 100)])) + assert_that(result, equal_to([('a', 6), ('b', 200), ('c', 100)])) result.pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py index 2d35d8d..c7c6dba 100644 --- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py +++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform_test.py @@ -23,8 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import custom_ptransform from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class CustomCountTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/filters_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/filters_test.py b/sdks/python/apache_beam/examples/cookbook/filters_test.py index 44a352f..fd49f93 100644 --- a/sdks/python/apache_beam/examples/cookbook/filters_test.py +++ b/sdks/python/apache_beam/examples/cookbook/filters_test.py @@ -23,6 +23,8 @@ import unittest import apache_beam as beam from apache_beam.examples.cookbook import filters from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class FiltersTest(unittest.TestCase): @@ -45,22 +47,22 @@ class FiltersTest(unittest.TestCase): def test_basic(self): """Test that the correct result is returned for a simple dataset.""" results = self._get_result_for_month(1) - beam.assert_that( + assert_that( results, - beam.equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3}, - {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}])) + equal_to([{'year': 2010, 'month': 1, 'day': 1, 'mean_temp': 3}, + {'year': 2012, 'month': 1, 'day': 2, 'mean_temp': 3}])) results.pipeline.run() def test_basic_empty(self): """Test that the correct empty result is returned for a simple dataset.""" results = self._get_result_for_month(3) - beam.assert_that(results, beam.equal_to([])) + assert_that(results, equal_to([])) results.pipeline.run() def test_basic_empty_missing(self): """Test that the correct empty result is returned for a missing month.""" results = self._get_result_for_month(4) - beam.assert_that(results, beam.equal_to([])) + assert_that(results, equal_to([])) results.pipeline.run() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/cookbook/mergecontacts.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py index 5aaba10..4f53c61 100644 --- a/sdks/python/apache_beam/examples/cookbook/mergecontacts.py +++ b/sdks/python/apache_beam/examples/cookbook/mergecontacts.py @@ -40,6 +40,8 @@ from apache_beam.io import ReadFromText from apache_beam.io import WriteToText from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to def run(argv=None, assert_results=None): @@ -118,12 +120,12 @@ def run(argv=None, assert_results=None): # TODO(silviuc): Move the assert_results logic to the unit test. if assert_results is not None: expected_luddites, expected_writers, expected_nomads = assert_results - beam.assert_that(num_luddites, beam.equal_to([expected_luddites]), - label='assert:luddites') - beam.assert_that(num_writers, beam.equal_to([expected_writers]), - label='assert:writers') - beam.assert_that(num_nomads, beam.equal_to([expected_nomads]), - label='assert:nomads') + assert_that(num_luddites, equal_to([expected_luddites]), + label='assert:luddites') + assert_that(num_writers, equal_to([expected_writers]), + label='assert:writers') + assert_that(num_nomads, equal_to([expected_nomads]), + label='assert:nomads') # Execute pipeline. return p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py index 1bdb9a3..7259572 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets.py +++ b/sdks/python/apache_beam/examples/snippets/snippets.py @@ -33,6 +33,8 @@ string. The tags can contain only letters, digits and _. import apache_beam as beam from apache_beam.metrics import Metrics from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to # Quiet some pylint warnings that happen because of the somewhat special # format for the code snippets. @@ -566,8 +568,9 @@ def examples_wordcount_debugging(renames): | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach'))) # [START example_wordcount_debugging_assert] - beam.assert_that( - filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)])) + beam.testing.util.assert_that( + filtered_words, beam.testing.util.equal_to( + [('Flourish', 3), ('stomach', 1)])) # [END example_wordcount_debugging_assert] output = (filtered_words @@ -661,8 +664,8 @@ def model_custom_source(count): # [END model_custom_source_use_new_source] lines = numbers | beam.core.Map(lambda number: 'line %d' % number) - beam.assert_that( - lines, beam.equal_to( + assert_that( + lines, equal_to( ['line ' + str(number) for number in range(0, count)])) p.run().wait_until_finish() @@ -691,8 +694,8 @@ def model_custom_source(count): # [END model_custom_source_use_ptransform] lines = numbers | beam.core.Map(lambda number: 'line %d' % number) - beam.assert_that( - lines, beam.equal_to( + assert_that( + lines, equal_to( ['line ' + str(number) for number in range(0, count)])) # Don't test runner api due to pickling errors. @@ -872,7 +875,7 @@ def model_textio_compressed(renames, expected): compression_type=beam.io.filesystem.CompressionTypes.GZIP) # [END model_textio_write_compressed] - beam.assert_that(lines, beam.equal_to(expected)) + assert_that(lines, equal_to(expected)) p.visit(SnippetUtils.RenameFiles(renames)) p.run().wait_until_finish() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/snippets/snippets_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py index 37cd470..f7b51a7 100644 --- a/sdks/python/apache_beam/examples/snippets/snippets_test.py +++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py @@ -30,10 +30,10 @@ from apache_beam import coders from apache_beam import pvalue from apache_beam import typehints from apache_beam.coders.coders import ToStringCoder -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.examples.snippets import snippets +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from apache_beam.utils.windowed_value import WindowedValue # pylint: disable=expression-not-assigned @@ -158,11 +158,11 @@ class ParDoTest(unittest.TestCase): avg_word_len)) # [END model_pardo_side_input] - beam.assert_that(small_words, beam.equal_to(['a', 'bb', 'ccc'])) - beam.assert_that(larger_than_average, beam.equal_to(['ccc', 'dddd']), - label='larger_than_average') - beam.assert_that(small_but_nontrivial, beam.equal_to(['bb']), - label='small_but_not_trivial') + assert_that(small_words, equal_to(['a', 'bb', 'ccc'])) + assert_that(larger_than_average, equal_to(['ccc', 'dddd']), + label='larger_than_average') + assert_that(small_but_nontrivial, equal_to(['bb']), + label='small_but_not_trivial') p.run() def test_pardo_side_input_dofn(self): @@ -816,7 +816,7 @@ class CombineTest(unittest.TestCase): | 'group' >> beam.GroupByKey() | 'combine' >> beam.CombineValues(sum)) unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - beam.assert_that(unkeyed, beam.equal_to([110, 215, 120])) + assert_that(unkeyed, equal_to([110, 215, 120])) p.run() def test_setting_sliding_windows(self): @@ -834,8 +834,8 @@ class CombineTest(unittest.TestCase): | 'group' >> beam.GroupByKey() | 'combine' >> beam.CombineValues(sum)) unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - beam.assert_that(unkeyed, - beam.equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) + assert_that(unkeyed, + equal_to([2, 2, 2, 18, 23, 39, 39, 39, 41, 41])) p.run() def test_setting_session_windows(self): @@ -853,8 +853,8 @@ class CombineTest(unittest.TestCase): | 'group' >> beam.GroupByKey() | 'combine' >> beam.CombineValues(sum)) unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - beam.assert_that(unkeyed, - beam.equal_to([29, 27])) + assert_that(unkeyed, + equal_to([29, 27])) p.run() def test_setting_global_window(self): @@ -872,7 +872,7 @@ class CombineTest(unittest.TestCase): | 'group' >> beam.GroupByKey() | 'combine' >> beam.CombineValues(sum)) unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - beam.assert_that(unkeyed, beam.equal_to([56])) + assert_that(unkeyed, equal_to([56])) p.run() def test_setting_timestamp(self): @@ -903,7 +903,7 @@ class CombineTest(unittest.TestCase): | 'group' >> beam.GroupByKey() | 'combine' >> beam.CombineValues(sum)) unkeyed = summed | 'unkey' >> beam.Map(lambda x: x[1]) - beam.assert_that(unkeyed, beam.equal_to([42, 187])) + assert_that(unkeyed, equal_to([42, 187])) p.run() @@ -921,7 +921,7 @@ class PTransformTest(unittest.TestCase): p = TestPipeline() lengths = p | beam.Create(["a", "ab", "abc"]) | ComputeWordLengths() - beam.assert_that(lengths, beam.equal_to([1, 2, 3])) + assert_that(lengths, equal_to([1, 2, 3])) p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/examples/wordcount_debugging.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_debugging.py b/sdks/python/apache_beam/examples/wordcount_debugging.py index 98acde4..ca9f7b6 100644 --- a/sdks/python/apache_beam/examples/wordcount_debugging.py +++ b/sdks/python/apache_beam/examples/wordcount_debugging.py @@ -51,6 +51,8 @@ from apache_beam.io import WriteToText from apache_beam.metrics import Metrics from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class FilterTextFn(beam.DoFn): @@ -133,8 +135,8 @@ def run(argv=None): # of the Pipeline implies that the expectations were met. Learn more at # https://cloud.google.com/dataflow/pipelines/testing-your-pipeline on how to # test your pipeline. - beam.assert_that( - filtered_words, beam.equal_to([('Flourish', 3), ('stomach', 1)])) + assert_that( + filtered_words, equal_to([('Flourish', 3), ('stomach', 1)])) # Format the counts into a PCollection of strings and write the output using a # "Write" transform that has side effects. http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/avroio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/avroio_test.py b/sdks/python/apache_beam/io/avroio_test.py index 4a21839..6dcf121 100644 --- a/sdks/python/apache_beam/io/avroio_test.py +++ b/sdks/python/apache_beam/io/avroio_test.py @@ -27,10 +27,10 @@ from apache_beam.io import avroio from apache_beam.io import filebasedsource from apache_beam.io import source_test_utils from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to # Importing following private class for testing purposes. from apache_beam.io.avroio import _AvroSource as AvroSource http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/concat_source_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/concat_source_test.py b/sdks/python/apache_beam/io/concat_source_test.py index a02f9ad..4a8f519 100644 --- a/sdks/python/apache_beam/io/concat_source_test.py +++ b/sdks/python/apache_beam/io/concat_source_test.py @@ -27,8 +27,8 @@ from apache_beam.io import range_trackers from apache_beam.io import source_test_utils from apache_beam.io.concat_source import ConcatSource from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class RangeSource(iobase.BoundedSource): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index e17a004..afb340d 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -41,10 +41,10 @@ from apache_beam.io.filebasedsource import FileBasedSource from apache_beam.options.value_provider import StaticValueProvider from apache_beam.options.value_provider import RuntimeValueProvider from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to class LineSource(FileBasedSource): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/sources_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/sources_test.py b/sdks/python/apache_beam/io/sources_test.py index c0b8ad6..10d401b 100644 --- a/sdks/python/apache_beam/io/sources_test.py +++ b/sdks/python/apache_beam/io/sources_test.py @@ -28,8 +28,8 @@ from apache_beam import coders from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to class LineSource(iobase.BoundedSource): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/textio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py index d00afef..9a4ec47 100644 --- a/sdks/python/apache_beam/io/textio_test.py +++ b/sdks/python/apache_beam/io/textio_test.py @@ -44,9 +44,8 @@ from apache_beam.io.filebasedsource_test import write_pattern from apache_beam.io.filesystem import CompressionTypes from apache_beam.testing.test_pipeline import TestPipeline - -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to # TODO: Refactor code so all io tests are using same library http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/io/tfrecordio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py index b7e370d..3c70ade 100644 --- a/sdks/python/apache_beam/io/tfrecordio_test.py +++ b/sdks/python/apache_beam/io/tfrecordio_test.py @@ -36,6 +36,8 @@ from apache_beam.io.tfrecordio import _TFRecordUtil from apache_beam.io.tfrecordio import ReadFromTFRecord from apache_beam.io.tfrecordio import WriteToTFRecord from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to import crcmod @@ -254,7 +256,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): coder=coders.BytesCoder(), compression_type=CompressionTypes.AUTO, validate=True))) - beam.assert_that(result, beam.equal_to(['foo'])) + assert_that(result, equal_to(['foo'])) def test_process_multiple(self): path = os.path.join(self._new_tempdir(), 'result') @@ -267,7 +269,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): coder=coders.BytesCoder(), compression_type=CompressionTypes.AUTO, validate=True))) - beam.assert_that(result, beam.equal_to(['foo', 'bar'])) + assert_that(result, equal_to(['foo', 'bar'])) def test_process_gzip(self): path = os.path.join(self._new_tempdir(), 'result') @@ -280,7 +282,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): coder=coders.BytesCoder(), compression_type=CompressionTypes.GZIP, validate=True))) - beam.assert_that(result, beam.equal_to(['foo', 'bar'])) + assert_that(result, equal_to(['foo', 'bar'])) def test_process_auto(self): path = os.path.join(self._new_tempdir(), 'result.gz') @@ -293,7 +295,7 @@ class TestTFRecordSource(_TestCaseWithTempDirCleanUp): coder=coders.BytesCoder(), compression_type=CompressionTypes.AUTO, validate=True))) - beam.assert_that(result, beam.equal_to(['foo', 'bar'])) + assert_that(result, equal_to(['foo', 'bar'])) class TestReadFromTFRecordSource(TestTFRecordSource): @@ -305,7 +307,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource): result = (p | ReadFromTFRecord( path, compression_type=CompressionTypes.GZIP)) - beam.assert_that(result, beam.equal_to(['foo', 'bar'])) + assert_that(result, equal_to(['foo', 'bar'])) def test_process_gzip_auto(self): path = os.path.join(self._new_tempdir(), 'result.gz') @@ -314,7 +316,7 @@ class TestReadFromTFRecordSource(TestTFRecordSource): result = (p | ReadFromTFRecord( path, compression_type=CompressionTypes.AUTO)) - beam.assert_that(result, beam.equal_to(['foo', 'bar'])) + assert_that(result, equal_to(['foo', 'bar'])) class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp): @@ -337,7 +339,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp): # Read the file back and compare. with TestPipeline() as p: actual_data = p | ReadFromTFRecord(file_path_prefix + '-*') - beam.assert_that(actual_data, beam.equal_to(expected_data)) + assert_that(actual_data, equal_to(expected_data)) def test_end2end_auto_compression(self): file_path_prefix = os.path.join(self._new_tempdir(), 'result') @@ -351,7 +353,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp): # Read the file back and compare. with TestPipeline() as p: actual_data = p | ReadFromTFRecord(file_path_prefix + '-*') - beam.assert_that(actual_data, beam.equal_to(expected_data)) + assert_that(actual_data, equal_to(expected_data)) def test_end2end_auto_compression_unsharded(self): file_path_prefix = os.path.join(self._new_tempdir(), 'result') @@ -365,7 +367,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp): # Read the file back and compare. with TestPipeline() as p: actual_data = p | ReadFromTFRecord(file_path_prefix + '.gz') - beam.assert_that(actual_data, beam.equal_to(expected_data)) + assert_that(actual_data, equal_to(expected_data)) @unittest.skipIf(tf is None, 'tensorflow not installed.') def test_end2end_example_proto(self): @@ -385,7 +387,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp): actual_data = (p | ReadFromTFRecord( file_path_prefix + '-*', coder=beam.coders.ProtoCoder(example.__class__))) - beam.assert_that(actual_data, beam.equal_to([example])) + assert_that(actual_data, equal_to([example])) def test_end2end_read_write_read(self): path = os.path.join(self._new_tempdir(), 'result') @@ -400,7 +402,7 @@ class TestEnd2EndWriteAndRead(_TestCaseWithTempDirCleanUp): # Read the file back and compare. with TestPipeline() as p: actual_data = p | ReadFromTFRecord(path+'-*', validate=True) - beam.assert_that(actual_data, beam.equal_to(expected_data)) + assert_that(actual_data, equal_to(expected_data)) if __name__ == '__main__': http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py index 8aa8a8a..e0775d1 100644 --- a/sdks/python/apache_beam/pipeline_test.py +++ b/sdks/python/apache_beam/pipeline_test.py @@ -33,6 +33,8 @@ from apache_beam.pipeline import PipelineVisitor from apache_beam.pvalue import AsSingleton from apache_beam.runners.dataflow.native_io.iobase import NativeSource from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from apache_beam.transforms import CombineGlobally from apache_beam.transforms import Create from apache_beam.transforms import FlatMap @@ -41,8 +43,6 @@ from apache_beam.transforms import DoFn from apache_beam.transforms import ParDo from apache_beam.transforms import PTransform from apache_beam.transforms import WindowInto -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to from apache_beam.transforms.window import SlidingWindows from apache_beam.transforms.window import TimestampedValue from apache_beam.utils.timestamp import MIN_TIMESTAMP http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py index 062e6f9..b7ba15a 100644 --- a/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/maptask_executor_runner_test.py @@ -28,9 +28,9 @@ from apache_beam.metrics.execution import MetricsEnvironment from apache_beam.metrics.metricbase import MetricName from apache_beam.pvalue import AsList -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import BeamAssertException -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import BeamAssertException +from apache_beam.testing.util import equal_to from apache_beam.transforms.window import TimestampedValue from apache_beam.runners.portability import maptask_executor_runner http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/runners/runner_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/runner_test.py b/sdks/python/apache_beam/runners/runner_test.py index c61c49f..fa80b1c 100644 --- a/sdks/python/apache_beam/runners/runner_test.py +++ b/sdks/python/apache_beam/runners/runner_test.py @@ -36,8 +36,8 @@ from apache_beam.metrics.metricbase import MetricName from apache_beam.pipeline import Pipeline from apache_beam.runners import DirectRunner from apache_beam.runners import create_runner -from apache_beam.transforms.util import assert_that -from apache_beam.transforms.util import equal_to +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to from apache_beam.options.pipeline_options import PipelineOptions http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py new file mode 100644 index 0000000..60a6b21 --- /dev/null +++ b/sdks/python/apache_beam/testing/util.py @@ -0,0 +1,107 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Utilities for testing Beam pipelines.""" + +from __future__ import absolute_import + +from apache_beam import pvalue +from apache_beam.transforms import window +from apache_beam.transforms.core import Create +from apache_beam.transforms.core import Map +from apache_beam.transforms.core import WindowInto +from apache_beam.transforms.util import CoGroupByKey +from apache_beam.transforms.ptransform import PTransform + + +__all__ = [ + 'assert_that', + 'equal_to', + 'is_empty', + ] + + +class BeamAssertException(Exception): + """Exception raised by matcher classes used by assert_that transform.""" + + pass + + +# Note that equal_to always sorts the expected and actual since what we +# compare are PCollections for which there is no guaranteed order. +# However the sorting does not go beyond top level therefore [1,2] and [2,1] +# are considered equal and [[1,2]] and [[2,1]] are not. +# TODO(silviuc): Add contains_in_any_order-style matchers. +def equal_to(expected): + expected = list(expected) + + def _equal(actual): + sorted_expected = sorted(expected) + sorted_actual = sorted(actual) + if sorted_expected != sorted_actual: + raise BeamAssertException( + 'Failed assert: %r == %r' % (sorted_expected, sorted_actual)) + return _equal + + +def is_empty(): + def _empty(actual): + actual = list(actual) + if actual: + raise BeamAssertException( + 'Failed assert: [] == %r' % actual) + return _empty + + +def assert_that(actual, matcher, label='assert_that'): + """A PTransform that checks a PCollection has an expected value. + + Note that assert_that should be used only for testing pipelines since the + check relies on materializing the entire PCollection being checked. + + Args: + actual: A PCollection. + matcher: A matcher function taking as argument the actual value of a + materialized PCollection. The matcher validates this actual value against + expectations and raises BeamAssertException if they are not met. + label: Optional string label. This is needed in case several assert_that + transforms are introduced in the same pipeline. + + Returns: + Ignored. + """ + assert isinstance(actual, pvalue.PCollection) + + class AssertThat(PTransform): + + def expand(self, pcoll): + # We must have at least a single element to ensure the matcher + # code gets run even if the input pcollection is empty. + keyed_singleton = pcoll.pipeline | Create([(None, None)]) + keyed_actual = ( + pcoll + | WindowInto(window.GlobalWindows()) + | "ToVoidKey" >> Map(lambda v: (None, v))) + _ = ((keyed_singleton, keyed_actual) + | "Group" >> CoGroupByKey() + | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values) + | "Match" >> Map(matcher)) + + def default_label(self): + return label + + actual | AssertThat() # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/testing/util_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/util_test.py b/sdks/python/apache_beam/testing/util_test.py new file mode 100644 index 0000000..1acebb6 --- /dev/null +++ b/sdks/python/apache_beam/testing/util_test.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unit tests for testing utilities.""" + +import unittest + +from apache_beam import Create +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to, is_empty + + +class UtilTest(unittest.TestCase): + + def test_assert_that_passes(self): + with TestPipeline() as p: + assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3])) + + def test_assert_that_fails(self): + with self.assertRaises(Exception): + with TestPipeline() as p: + assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3])) + + def test_assert_that_fails_on_empty_input(self): + with self.assertRaises(Exception): + with TestPipeline() as p: + assert_that(p | Create([]), equal_to([1, 2, 3])) + + def test_assert_that_fails_on_empty_expected(self): + with self.assertRaises(Exception): + with TestPipeline() as p: + assert_that(p | Create([1, 2, 3]), is_empty()) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/combiners_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/combiners_test.py b/sdks/python/apache_beam/transforms/combiners_test.py index 1822c19..946a60a 100644 --- a/sdks/python/apache_beam/transforms/combiners_test.py +++ b/sdks/python/apache_beam/transforms/combiners_test.py @@ -24,13 +24,13 @@ import hamcrest as hc import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline import apache_beam.transforms.combiners as combine +from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms.core import CombineGlobally from apache_beam.transforms.core import Create from apache_beam.transforms.core import Map from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.ptransform import PTransform -from apache_beam.transforms.util import assert_that, equal_to class CombineTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/create_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/create_test.py b/sdks/python/apache_beam/transforms/create_test.py index 9ede4c7..55ad7f3 100644 --- a/sdks/python/apache_beam/transforms/create_test.py +++ b/sdks/python/apache_beam/transforms/create_test.py @@ -20,9 +20,10 @@ import unittest from apache_beam.io import source_test_utils -from apache_beam import Create, assert_that, equal_to +from apache_beam import Create from apache_beam.coders import FastPrimitivesCoder from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to class CreateTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 3320d79..f790660 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -33,12 +33,12 @@ from apache_beam.io.iobase import Read from apache_beam.options.pipeline_options import TypeOptions import apache_beam.pvalue as pvalue from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms import window from apache_beam.transforms.core import GroupByKeyOnly import apache_beam.transforms.combiners as combine from apache_beam.transforms.display import DisplayData, DisplayDataItem from apache_beam.transforms.ptransform import PTransform -from apache_beam.transforms.util import assert_that, equal_to import apache_beam.typehints as typehints from apache_beam.typehints import with_input_types from apache_beam.typehints import with_output_types http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/sideinputs_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/sideinputs_test.py b/sdks/python/apache_beam/transforms/sideinputs_test.py index 0bc9107..6500681 100644 --- a/sdks/python/apache_beam/transforms/sideinputs_test.py +++ b/sdks/python/apache_beam/transforms/sideinputs_test.py @@ -24,8 +24,8 @@ from nose.plugins.attrib import attr import apache_beam as beam from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms import window -from apache_beam.transforms.util import assert_that, equal_to class SideInputsTest(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/trigger_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py index 2574c4b..a27f47f 100644 --- a/sdks/python/apache_beam/transforms/trigger_test.py +++ b/sdks/python/apache_beam/transforms/trigger_test.py @@ -27,6 +27,7 @@ import yaml import apache_beam as beam from apache_beam.runners import pipeline_context from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms import trigger from apache_beam.transforms.core import Windowing from apache_beam.transforms.trigger import AccumulationMode @@ -40,7 +41,6 @@ from apache_beam.transforms.trigger import GeneralTriggerDriver from apache_beam.transforms.trigger import InMemoryUnmergedState from apache_beam.transforms.trigger import Repeatedly from apache_beam.transforms.trigger import TriggerFn -from apache_beam.transforms.util import assert_that, equal_to from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import IntervalWindow from apache_beam.transforms.window import MIN_TIMESTAMP http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py index a6ecf0a..a7484ac 100644 --- a/sdks/python/apache_beam/transforms/util.py +++ b/sdks/python/apache_beam/transforms/util.py @@ -20,14 +20,10 @@ from __future__ import absolute_import -from apache_beam import pvalue -from apache_beam.transforms import window from apache_beam.transforms.core import CombinePerKey -from apache_beam.transforms.core import Create from apache_beam.transforms.core import Flatten from apache_beam.transforms.core import GroupByKey from apache_beam.transforms.core import Map -from apache_beam.transforms.core import WindowInto from apache_beam.transforms.ptransform import PTransform from apache_beam.transforms.ptransform import ptransform_fn @@ -38,9 +34,6 @@ __all__ = [ 'KvSwap', 'RemoveDuplicates', 'Values', - 'assert_that', - 'equal_to', - 'is_empty', ] @@ -169,75 +162,3 @@ def RemoveDuplicates(pcoll): # pylint: disable=invalid-name | 'ToPairs' >> Map(lambda v: (v, None)) | 'Group' >> CombinePerKey(lambda vs: None) | 'RemoveDuplicates' >> Keys()) - - -class BeamAssertException(Exception): - """Exception raised by matcher classes used by assert_that transform.""" - - pass - - -# Note that equal_to always sorts the expected and actual since what we -# compare are PCollections for which there is no guaranteed order. -# However the sorting does not go beyond top level therefore [1,2] and [2,1] -# are considered equal and [[1,2]] and [[2,1]] are not. -# TODO(silviuc): Add contains_in_any_order-style matchers. -def equal_to(expected): - expected = list(expected) - - def _equal(actual): - sorted_expected = sorted(expected) - sorted_actual = sorted(actual) - if sorted_expected != sorted_actual: - raise BeamAssertException( - 'Failed assert: %r == %r' % (sorted_expected, sorted_actual)) - return _equal - - -def is_empty(): - def _empty(actual): - actual = list(actual) - if actual: - raise BeamAssertException( - 'Failed assert: [] == %r' % actual) - return _empty - - -def assert_that(actual, matcher, label='assert_that'): - """A PTransform that checks a PCollection has an expected value. - - Note that assert_that should be used only for testing pipelines since the - check relies on materializing the entire PCollection being checked. - - Args: - actual: A PCollection. - matcher: A matcher function taking as argument the actual value of a - materialized PCollection. The matcher validates this actual value against - expectations and raises BeamAssertException if they are not met. - label: Optional string label. This is needed in case several assert_that - transforms are introduced in the same pipeline. - - Returns: - Ignored. - """ - assert isinstance(actual, pvalue.PCollection) - - class AssertThat(PTransform): - - def expand(self, pcoll): - # We must have at least a single element to ensure the matcher - # code gets run even if the input pcollection is empty. - keyed_singleton = pcoll.pipeline | Create([(None, None)]) - keyed_actual = ( - pcoll - | WindowInto(window.GlobalWindows()) - | "ToVoidKey" >> Map(lambda v: (None, v))) - _ = ((keyed_singleton, keyed_actual) - | "Group" >> CoGroupByKey() - | "Unkey" >> Map(lambda (k, (_, actual_values)): actual_values) - | "Match" >> Map(matcher)) - - def default_label(self): - return label - - actual | AssertThat() # pylint: disable=expression-not-assigned http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/util_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py deleted file mode 100644 index 7fdef70..0000000 --- a/sdks/python/apache_beam/transforms/util_test.py +++ /dev/null @@ -1,50 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the util transforms.""" - -import unittest - -from apache_beam import Create -from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that, equal_to, is_empty - - -class UtilTest(unittest.TestCase): - - def test_assert_that_passes(self): - with TestPipeline() as p: - assert_that(p | Create([1, 2, 3]), equal_to([1, 2, 3])) - - def test_assert_that_fails(self): - with self.assertRaises(Exception): - with TestPipeline() as p: - assert_that(p | Create([1, 10, 100]), equal_to([1, 2, 3])) - - def test_assert_that_fails_on_empty_input(self): - with self.assertRaises(Exception): - with TestPipeline() as p: - assert_that(p | Create([]), equal_to([1, 2, 3])) - - def test_assert_that_fails_on_empty_expected(self): - with self.assertRaises(Exception): - with TestPipeline() as p: - assert_that(p | Create([1, 2, 3]), is_empty()) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/window_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py index a7797dd..fd1bb9d 100644 --- a/sdks/python/apache_beam/transforms/window_test.py +++ b/sdks/python/apache_beam/transforms/window_test.py @@ -21,6 +21,7 @@ import unittest from apache_beam.runners import pipeline_context from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, equal_to from apache_beam.transforms import CombinePerKey from apache_beam.transforms import combiners from apache_beam.transforms import core @@ -31,7 +32,6 @@ from apache_beam.transforms import WindowInto from apache_beam.transforms.core import Windowing from apache_beam.transforms.trigger import AccumulationMode from apache_beam.transforms.trigger import AfterCount -from apache_beam.transforms.util import assert_that, equal_to from apache_beam.transforms.window import FixedWindows from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import GlobalWindows http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/transforms/write_ptransform_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/write_ptransform_test.py b/sdks/python/apache_beam/transforms/write_ptransform_test.py index 27e7caa..e31b9cc 100644 --- a/sdks/python/apache_beam/transforms/write_ptransform_test.py +++ b/sdks/python/apache_beam/transforms/write_ptransform_test.py @@ -23,8 +23,8 @@ import apache_beam as beam from apache_beam.io import iobase from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that, is_empty from apache_beam.transforms.ptransform import PTransform -from apache_beam.transforms.util import assert_that, is_empty class _TestSink(iobase.Sink): http://git-wip-us.apache.org/repos/asf/beam/blob/2070f118/sdks/python/apache_beam/typehints/typed_pipeline_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py b/sdks/python/apache_beam/typehints/typed_pipeline_test.py index 3494cfe..589dc0e 100644 --- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py +++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py @@ -25,7 +25,7 @@ from apache_beam import pvalue from apache_beam import typehints from apache_beam.options.pipeline_options import OptionsContext from apache_beam.testing.test_pipeline import TestPipeline -from apache_beam.transforms.util import assert_that, equal_to +from apache_beam.testing.util import assert_that, equal_to from apache_beam.typehints import WithTypeHints # These test often construct a pipeline as value | PTransform to test side