Removed unnecessary throttling of rename parallelism.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/24bb8f19 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/24bb8f19 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/24bb8f19 Branch: refs/heads/python-sdk Commit: 24bb8f19329b3d0c1d0330e0c16c41ab1554684d Parents: 4b7fe2d Author: Marian Dvorsky <mari...@google.com> Authored: Fri Sep 16 10:46:32 2016 -0700 Committer: Robert Bradshaw <rober...@google.com> Committed: Mon Sep 19 17:39:47 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/fileio.py | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/24bb8f19/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index e3d4dae..d640d50 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -693,11 +693,7 @@ class FileSink(iobase.Sink): The output of this write is a PCollection of all written shards. """ - # Approximate number of write results be assigned for each rename thread. - _WRITE_RESULTS_PER_RENAME_THREAD = 100 - - # Max number of threads to be used for renaming even if it means each thread - # will process more write results. + # Max number of threads to be used for renaming. _MAX_RENAME_THREADS = 64 def __init__(self, @@ -785,8 +781,7 @@ class FileSink(iobase.Sink): writer_results = sorted(writer_results) num_shards = len(writer_results) channel_factory = ChannelFactory() - min_threads = min(num_shards / FileSink._WRITE_RESULTS_PER_RENAME_THREAD, - FileSink._MAX_RENAME_THREADS) + min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS) num_threads = max(1, min_threads) rename_ops = []