This is an automated email from the ASF dual-hosted git repository. altay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 2bcf9fd [BEAM-5319] Python 3 port runners module (#7445) 2bcf9fd is described below commit 2bcf9fd40390dd0192c5456abedfbb290a792866 Author: Robbe Sneyders <robbe.sneyd...@gmail.com> AuthorDate: Tue Jan 15 02:51:31 2019 +0100 [BEAM-5319] Python 3 port runners module (#7445) * Python 3 port runners module * Change random seed in opcounters test to one that works on both Python 2 and 3 --- .../runners/interactive/cache_manager.py | 3 ++- .../runners/interactive/cache_manager_test.py | 4 ++-- .../runners/interactive/interactive_runner_test.py | 8 -------- .../runners/interactive/pipeline_analyzer_test.py | 11 ----------- .../runners/portability/fn_api_runner_test.py | 21 --------------------- .../apache_beam/runners/worker/opcounters_test.py | 7 +------ 6 files changed, 5 insertions(+), 49 deletions(-) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager.py b/sdks/python/apache_beam/runners/interactive/cache_manager.py index 75805d6..e8816fe 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager.py @@ -202,7 +202,8 @@ class SafeFastPrimitivesCoder(coders.Coder): # pylint: disable=deprecated-urllib-function def encode(self, value): - return quote(coders.coders.FastPrimitivesCoder().encode(value)) + return quote(coders.coders.FastPrimitivesCoder().encode(value)).encode( + 'utf-8') def decode(self, value): return coders.coders.FastPrimitivesCoder().decode(unquote_to_bytes(value)) diff --git a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py index ff82f3b..641643f 100644 --- a/sdks/python/apache_beam/runners/interactive/cache_manager_test.py +++ b/sdks/python/apache_beam/runners/interactive/cache_manager_test.py @@ -61,10 +61,10 @@ class FileBasedCacheManagerTest(unittest.TestCase): time.sleep(0.1) cache_file = cache_label + '-1-of-2' - with open(self.cache_manager._path(prefix, cache_file), 'w') as f: + with open(self.cache_manager._path(prefix, cache_file), 'wb') as f: for line in pcoll_list: f.write(cache.SafeFastPrimitivesCoder().encode(line)) - f.write('\n') + f.write(b'\n') def test_exists(self): """Test that CacheManager can correctly tell if the cache exists or not.""" diff --git a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py index 767e06e..9958d21 100644 --- a/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py +++ b/sdks/python/apache_beam/runners/interactive/interactive_runner_test.py @@ -24,8 +24,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import os -import sys import unittest import apache_beam as beam @@ -44,9 +42,6 @@ def print_with_message(msg): class InteractiveRunnerTest(unittest.TestCase): - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_basic(self): p = beam.Pipeline( runner=interactive_runner.InteractiveRunner( @@ -62,9 +57,6 @@ class InteractiveRunnerTest(unittest.TestCase): _ = pc0 | 'Print3' >> beam.Map(print_with_message('Run3')) p.run().wait_until_finish() - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_wordcount(self): class WordExtractingDoFn(beam.DoFn): diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py index 53a4a33..92b5af1 100644 --- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py +++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py @@ -24,8 +24,6 @@ from __future__ import absolute_import from __future__ import division from __future__ import print_function -import os -import sys import unittest import apache_beam as beam @@ -88,9 +86,6 @@ class PipelineAnalyzerTest(unittest.TestCase): self.assertSetEqual(set(transform_proto1.outputs), set(transform_proto2.outputs)) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_basic(self): p = beam.Pipeline(runner=self.runner) @@ -138,9 +133,6 @@ class PipelineAnalyzerTest(unittest.TestCase): # No need to actually execute the second run. - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_word_count(self): p = beam.Pipeline(runner=self.runner) @@ -223,9 +215,6 @@ class PipelineAnalyzerTest(unittest.TestCase): self.assertPipelineEqual(analyzer.pipeline_proto_to_execute(), expected_pipeline_proto) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_read_cache_expansion(self): p = beam.Pipeline(runner=self.runner) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py index dc248d5..f30cd97 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py @@ -147,9 +147,6 @@ class FnApiRunnerTest(unittest.TestCase): assert_that(unnamed.even, equal_to([2]), label='unnamed.even') assert_that(unnamed.odd, equal_to([1, 3]), label='unnamed.odd') - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_pardo_side_inputs(self): def cross_product(elem, sides): for side in sides: @@ -161,9 +158,6 @@ class FnApiRunnerTest(unittest.TestCase): equal_to([('a', 'x'), ('b', 'x'), ('c', 'x'), ('a', 'y'), ('b', 'y'), ('c', 'y')])) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_pardo_windowed_side_inputs(self): with self.create_pipeline() as p: # Now with some windowing. @@ -191,9 +185,6 @@ class FnApiRunnerTest(unittest.TestCase): (9, list(range(7, 10)))]), label='windowed') - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_flattened_side_input(self): with self.create_pipeline() as p: main = p | 'main' >> beam.Create([None]) @@ -204,9 +195,6 @@ class FnApiRunnerTest(unittest.TestCase): main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)), equal_to([(None, {'a': 1, 'b': 2})])) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_gbk_side_input(self): with self.create_pipeline() as p: main = p | 'main' >> beam.Create([None]) @@ -215,9 +203,6 @@ class FnApiRunnerTest(unittest.TestCase): main | beam.Map(lambda a, b: (a, b), beam.pvalue.AsDict(side)), equal_to([(None, {'a': [1]})])) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_multimap_side_input(self): with self.create_pipeline() as p: main = p | 'main' >> beam.Create(['a', 'b']) @@ -229,9 +214,6 @@ class FnApiRunnerTest(unittest.TestCase): beam.pvalue.AsMultiMap(side)), equal_to([('a', [1, 3]), ('b', [2])])) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_pardo_unfusable_side_inputs(self): def cross_product(elem, sides): for side in sides: @@ -414,9 +396,6 @@ class FnApiRunnerTest(unittest.TestCase): | beam.Map(lambda k_vs1: (k_vs1[0], sorted(k_vs1[1])))) assert_that(res, equal_to([('k', [1, 2]), ('k', [100, 101, 102])])) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_large_elements(self): with self.create_pipeline() as p: big = (p diff --git a/sdks/python/apache_beam/runners/worker/opcounters_test.py b/sdks/python/apache_beam/runners/worker/opcounters_test.py index ba87d14..511b9b2 100644 --- a/sdks/python/apache_beam/runners/worker/opcounters_test.py +++ b/sdks/python/apache_beam/runners/worker/opcounters_test.py @@ -20,9 +20,7 @@ from __future__ import division import logging import math -import os import random -import sys import unittest from builtins import object from builtins import range @@ -169,15 +167,12 @@ class OperationCountersTest(unittest.TestCase): total_size += coder.estimate_size(value) self.verify_counters(opcounts, 3, (float(total_size) / 3)) - @unittest.skipIf(sys.version_info[0] == 3 and - os.environ.get('RUN_SKIPPED_PY3_TESTS') != '1', - 'This test still needs to be fixed on Python 3.') def test_should_sample(self): # Order of magnitude more buckets than highest constant in code under test. buckets = [0] * 300 # The seed is arbitrary and exists just to ensure this test is robust. # If you don't like this seed, try your own; the test should still pass. - random.seed(1717) + random.seed(1720) # Do enough runs that the expected hits even in the last buckets # is big enough to expect some statistical smoothing. total_runs = 10 * len(buckets)