Repository: beam Updated Branches: refs/heads/master 7a34042c0 -> 0a5deeac5
[BEAM-1584] Add file clean up util for integration tests Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/a17978a7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/a17978a7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/a17978a7 Branch: refs/heads/master Commit: a17978a7825433342a7f2371f80d1612c8cda055 Parents: 7a34042 Author: Mark Liu <mark...@google.com> Authored: Thu Aug 3 16:33:07 2017 -0700 Committer: Ahmet Altay <al...@google.com> Committed: Wed Aug 9 17:49:52 2017 -0700 ---------------------------------------------------------------------- .../apache_beam/examples/wordcount_it_test.py | 4 ++ sdks/python/apache_beam/testing/test_utils.py | 18 ++++++ .../apache_beam/testing/test_utils_test.py | 59 ++++++++++++++++++++ sdks/python/apache_beam/utils/retry.py | 7 +++ 4 files changed, 88 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/examples/wordcount_it_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount_it_test.py b/sdks/python/apache_beam/examples/wordcount_it_test.py index 4bee127..8d2e73e 100644 --- a/sdks/python/apache_beam/examples/wordcount_it_test.py +++ b/sdks/python/apache_beam/examples/wordcount_it_test.py @@ -25,6 +25,7 @@ from hamcrest.core.core.allof import all_of from nose.plugins.attrib import attr from apache_beam.examples import wordcount +from apache_beam.testing.test_utils import delete_files from apache_beam.testing.pipeline_verifiers import FileChecksumMatcher from apache_beam.testing.pipeline_verifiers import PipelineStateMatcher from apache_beam.testing.test_pipeline import TestPipeline @@ -56,6 +57,9 @@ class WordCountIT(unittest.TestCase): extra_opts = {'output': output, 'on_success_matcher': all_of(*pipeline_verifiers)} + # Register clean up before pipeline execution + self.addCleanup(delete_files, [output + '*']) + # Get pipeline options from command argument: --test-pipeline-options, # and start pipeline job by calling pipeline main function. wordcount.run(test_pipeline.get_full_options_as_args(**extra_opts)) http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/testing/test_utils.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_utils.py b/sdks/python/apache_beam/testing/test_utils.py index 9feb80e..26ca03d 100644 --- a/sdks/python/apache_beam/testing/test_utils.py +++ b/sdks/python/apache_beam/testing/test_utils.py @@ -24,6 +24,7 @@ import hashlib import imp from mock import Mock, patch +from apache_beam.io.filesystems import FileSystems from apache_beam.utils import retry @@ -71,3 +72,20 @@ def patch_retry(testcase, module): imp.reload(module) testcase.addCleanup(remove_patches) + + +@retry.with_exponential_backoff( + num_retries=3, + retry_filter=retry.retry_on_beam_io_error_filter) +def delete_files(file_paths): + """A function to clean up files or directories using ``FileSystems``. + + Glob is supported in file path and directories will be deleted recursively. + + Args: + file_paths: A list of strings contains file paths or directories. + """ + if len(file_paths) == 0: + raise RuntimeError('Clean up failed. Invalid file path: %s.' % + file_paths) + FileSystems.delete(file_paths) http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/testing/test_utils_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/testing/test_utils_test.py b/sdks/python/apache_beam/testing/test_utils_test.py new file mode 100644 index 0000000..bee0bd3 --- /dev/null +++ b/sdks/python/apache_beam/testing/test_utils_test.py @@ -0,0 +1,59 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Unittest for testing utilities,""" + +import logging +import tempfile +import unittest +from mock import patch + +from apache_beam.io.filesystem import BeamIOError +from apache_beam.io.filesystems import FileSystems +from apache_beam.testing import test_utils as utils + + +class TestUtilsTest(unittest.TestCase): + + def setUp(self): + utils.patch_retry(self, utils) + self.tmpdir = tempfile.mkdtemp() + + def test_delete_files_succeeds(self): + f = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False) + assert FileSystems.exists(f.name) + utils.delete_files([f.name]) + assert not FileSystems.exists(f.name) + + @patch.object(FileSystems, 'delete', side_effect=BeamIOError('')) + def test_delete_files_fails_with_io_error(self, mocked_delete): + f = tempfile.NamedTemporaryFile(dir=self.tmpdir, delete=False) + assert FileSystems.exists(f.name) + + with self.assertRaises(BeamIOError): + utils.delete_files([f.name]) + self.assertTrue(mocked_delete.called) + self.assertEqual(mocked_delete.call_count, 4) + + def test_delete_files_fails_with_invalid_arg(self): + with self.assertRaises(RuntimeError): + utils.delete_files([]) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main() http://git-wip-us.apache.org/repos/asf/beam/blob/a17978a7/sdks/python/apache_beam/utils/retry.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/retry.py b/sdks/python/apache_beam/utils/retry.py index 1a8b907..08223b3 100644 --- a/sdks/python/apache_beam/utils/retry.py +++ b/sdks/python/apache_beam/utils/retry.py @@ -31,6 +31,8 @@ import sys import time import traceback +from apache_beam.io.filesystem import BeamIOError + # Protect against environments where apitools library is not available. # pylint: disable=wrong-import-order, wrong-import-position # TODO(sourabhbajaj): Remove the GCP specific error code to a submodule @@ -99,6 +101,11 @@ def retry_on_server_errors_and_timeout_filter(exception): return retry_on_server_errors_filter(exception) +def retry_on_beam_io_error_filter(exception): + """Filter allowing retries on Beam IO errors.""" + return isinstance(exception, BeamIOError) + + SERVER_ERROR_OR_TIMEOUT_CODES = [408, 500, 502, 503, 504, 598, 599]