Improve size estimation speed for file samples
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/9caaea0f Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/9caaea0f Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/9caaea0f Branch: refs/heads/python-sdk Commit: 9caaea0f15131ecc64c56ce579361094edc50ae5 Parents: 739a431 Author: Sourabh Bajaj <sourabhba...@google.com> Authored: Wed Nov 30 12:18:04 2016 -0800 Committer: Robert Bradshaw <rober...@gmail.com> Committed: Thu Dec 1 09:10:06 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/9caaea0f/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 14eaf27..14c2b06 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -188,11 +188,12 @@ class FileBasedSource(iobase.BoundedSource): def estimate_size(self): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] + # We're reading very few files so we can pass names file names to + # _estimate_sizes_of_files without pattern as otherwise we'll try to do + # optimization based on the pattern and might end up reading much more + # data than needed for a few files. if (len(file_names) <= FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT): - # We're reading very few files so we can pass names without pattern - # as otherwise we'll try to do optimization based on the pattern and - # might end up reading much more data than needed for a few files. return sum(self._estimate_sizes_of_files(file_names)) else: # Estimating size of a random sample. @@ -202,10 +203,8 @@ class FileBasedSource(iobase.BoundedSource): int(len(file_names) * FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT)) sample = random.sample(file_names, sample_size) - estimate = self._estimate_sizes_of_files(sample, self._pattern) - return int( - sum(estimate) * - (float(len(file_names)) / len(sample))) + estimate = self._estimate_sizes_of_files(sample) + return int(sum(estimate) * (float(len(file_names)) / len(sample))) def read(self, range_tracker): return self._get_concat_source().read(range_tracker)