Repository: beam Updated Branches: refs/heads/master f87597e10 -> 37e4cc1b8
[BEAM-1892] File size estimation process reporting Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/378b3f5b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/378b3f5b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/378b3f5b Branch: refs/heads/master Commit: 378b3f5bf9886cc390e674a3e600a22ad2e5cb98 Parents: f87597e Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Wed Apr 5 16:49:05 2017 -0700 Committer: Chamikara Jayalath <chamik...@google.com> Committed: Thu Apr 6 11:29:10 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 1 + sdks/python/apache_beam/io/gcp/gcsfilesystem.py | 2 +- .../apache_beam/io/gcp/gcsfilesystem_test.py | 29 ++++++++++++++- sdks/python/apache_beam/io/gcp/gcsio.py | 16 +++++++- sdks/python/apache_beam/io/gcp/gcsio_test.py | 39 ++++++++++++++++++++ 5 files changed, 83 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 930d958..2e7043f 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -35,6 +35,7 @@ from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils.value_provider import ValueProvider from apache_beam.utils.value_provider import StaticValueProvider from apache_beam.utils.value_provider import check_accessible + MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25 http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py index 5aef0ab..d79630f 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py @@ -65,7 +65,7 @@ class GCSFileSystem(FileSystem): """ if pattern.endswith('/'): pattern += '*' - file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern) + file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit) metadata_list = [FileMetadata(path, size) for path, size in file_sizes.iteritems()] return MatchResult(pattern, metadata_list) http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py index 73a3893..5a1f10d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py @@ -54,7 +54,31 @@ class GCSFileSystemTest(unittest.TestCase): self.assertEqual( set(match_result.metadata_list), expected_results) - gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + gcsio_mock.size_of_files_in_glob.assert_called_once_with( + 'gs://bucket/*', None) + + @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') + def test_match_multiples_limit(self, mock_gcsio): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + limit = 1 + gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock + gcsio_mock.size_of_files_in_glob.return_value = { + 'gs://bucket/file1': 1 + } + expected_results = set([ + FileMetadata('gs://bucket/file1', 1) + ]) + file_system = gcsfilesystem.GCSFileSystem() + match_result = file_system.match(['gs://bucket/'], [limit])[0] + self.assertEqual( + set(match_result.metadata_list), + expected_results) + self.assertEqual( + len(match_result.metadata_list), + limit) + gcsio_mock.size_of_files_in_glob.assert_called_once_with( + 'gs://bucket/*', 1) @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') def test_match_multiples_error(self, mock_gcsio): @@ -71,7 +95,8 @@ class GCSFileSystemTest(unittest.TestCase): self.assertTrue( error.exception.message.startswith('Match operation failed')) self.assertEqual(error.exception.exception_details, expected_results) - gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + gcsio_mock.size_of_files_in_glob.assert_called_once_with( + 'gs://bucket/*', None) @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio') def test_match_multiple_patterns(self, mock_gcsio): http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py index 0a10094..c76c99d 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio.py +++ b/sdks/python/apache_beam/io/gcp/gcsio.py @@ -29,6 +29,7 @@ import os import Queue import re import threading +import time import traceback from apache_beam.utils import retry @@ -368,7 +369,7 @@ class GcsIO(object): @retry.with_exponential_backoff( retry_filter=retry.retry_on_server_errors_and_timeout_filter) - def size_of_files_in_glob(self, pattern): + def size_of_files_in_glob(self, pattern, limit=None): """Returns the size of all the files in the glob as a dictionary Args: @@ -379,16 +380,29 @@ class GcsIO(object): prefix = re.match('^[^[*?]*', name_pattern).group(0) request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix) file_sizes = {} + counter = 0 + start_time = time.time() + logging.info("Starting the size estimation of the input") while True: response = self.client.objects.List(request) for item in response.items: if fnmatch.fnmatch(item.name, name_pattern): file_name = 'gs://%s/%s' % (item.bucket, item.name) file_sizes[file_name] = item.size + counter += 1 + if limit is not None and counter >= limit: + break + if counter % 10000 == 0: + logging.info("Finished computing size of: %s files", len(file_sizes)) if response.nextPageToken: request.pageToken = response.nextPageToken + if limit is not None and len(file_sizes) >= limit: + break else: break + logging.info( + "Finished the size estimation of the input at %s files. " +\ + "Estimation took %s seconds", counter, time.time() - start_time) return file_sizes http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py index c028f0d..73d2213 100644 --- a/sdks/python/apache_beam/io/gcp/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py @@ -747,6 +747,45 @@ class TestGCSIO(unittest.TestCase): self.assertEqual( self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes) + def test_size_of_files_in_glob_limited(self): + bucket_name = 'gcsio-test' + object_names = [ + ('cow/cat/fish', 2), + ('cow/cat/blubber', 3), + ('cow/dog/blubber', 4), + ('apple/dog/blubber', 5), + ('apple/fish/blubber', 6), + ('apple/fish/blowfish', 7), + ('apple/fish/bambi', 8), + ('apple/fish/balloon', 9), + ('apple/fish/cat', 10), + ('apple/fish/cart', 11), + ('apple/fish/carl', 12), + ('apple/dish/bat', 13), + ('apple/dish/cat', 14), + ('apple/dish/carl', 15), + ] + for (object_name, size) in object_names: + file_name = 'gs://%s/%s' % (bucket_name, object_name) + self._insert_random_file(self.client, file_name, size) + test_cases = [ + ('gs://gcsio-test/cow/*', [ + ('cow/cat/fish', 2), + ('cow/cat/blubber', 3), + ('cow/dog/blubber', 4), + ]), + ('gs://gcsio-test/apple/fish/car?', [ + ('apple/fish/cart', 11), + ('apple/fish/carl', 12), + ]) + ] + # Check if limits are followed correctly + limit = 1 + for file_pattern, expected_object_names in test_cases: + expected_num_items = min(len(expected_object_names), limit) + self.assertEqual( + len(self.gcs.glob(file_pattern, limit)), expected_num_items) + @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') class TestPipeStream(unittest.TestCase):