Repository: incubator-beam Updated Branches: refs/heads/python-sdk 20687e83a -> 0e5c662b4
Clean up usage of temp directories in _stage_extra_packages * temp_dir is a required parameter, clarified the doc string * Renamed similarly named tempdir to temp_dir and creates this new temp directory under the temp_dir. A separate temp directory is needed for listing only the downloaded resources at this stage. * Removed the tempdir clean up stage. Caller is giving the tempdir and responsible for cleaning it. Only caller (stage_job_resources) already does this. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5a49a79 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5a49a79 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5a49a79 Branch: refs/heads/python-sdk Commit: c5a49a797c104021126205e546a3e20e6d5d9709 Parents: 20687e8 Author: Ahmet Altay <al...@google.com> Authored: Wed Jun 15 18:10:34 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Fri Jun 17 10:49:05 2016 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/utils/dependency.py | 37 ++++++++---------------- 1 file changed, 12 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5a49a79/sdks/python/apache_beam/utils/dependency.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/dependency.py b/sdks/python/apache_beam/utils/dependency.py index 05fcfd5..1c6ad9c 100644 --- a/sdks/python/apache_beam/utils/dependency.py +++ b/sdks/python/apache_beam/utils/dependency.py @@ -116,19 +116,18 @@ def _dependency_file_download(from_url, to_folder): return local_download_file -def _stage_extra_packages(extra_packages, - staging_location, - file_copy=_dependency_file_copy, temp_dir=None): +def _stage_extra_packages(extra_packages, staging_location, temp_dir, + file_copy=_dependency_file_copy): """Stages a list of local extra packages. Args: extra_packages: Ordered list of local paths to extra packages to be staged. staging_location: Staging location for the packages. + temp_dir: Temporary folder where the resource building can happen. Caller + is responsible for cleaning up this folder after this function returns. file_copy: Callable for copying files. The default version will copy from a local file to a GCS location using the gsutil tool available in the Google Cloud SDK package. - temp_dir: Temporary folder where the resource building can happen. If None - then a unique temp directory will be created. Used only for testing. Returns: A list of file names (no paths) for the resources staged. All the files @@ -139,7 +138,7 @@ def _stage_extra_packages(extra_packages, name patterns. """ resources = [] - tempdir = None + staging_temp_dir = None local_packages = [] for package in extra_packages: if not os.path.basename(package).endswith('.tar.gz'): @@ -149,11 +148,11 @@ def _stage_extra_packages(extra_packages, if not os.path.isfile(package): if package.startswith('gs://'): - if not tempdir: - tempdir = tempfile.mkdtemp() + if not staging_temp_dir: + staging_temp_dir = tempfile.mkdtemp(dir=temp_dir) logging.info('Downloading extra package: %s locally before staging', package) - _dependency_file_copy(package, tempdir) + _dependency_file_copy(package, staging_temp_dir) else: raise RuntimeError( 'The file %s cannot be found. It was specified in the ' @@ -161,9 +160,10 @@ def _stage_extra_packages(extra_packages, else: local_packages.append(package) - if tempdir: + if staging_temp_dir: local_packages.extend( - [utils.path.join(tempdir, f) for f in os.listdir(tempdir)]) + [utils.path.join(staging_temp_dir, f) for f in os.listdir( + staging_temp_dir)]) for package in local_packages: basename = os.path.basename(package) @@ -186,18 +186,6 @@ def _stage_extra_packages(extra_packages, file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path) resources.append(EXTRA_PACKAGES_FILE) - # Remove temp files created by downloading packages from GCS. - if tempdir: - try: - temp_files = os.listdir(tempdir) - for temp_file in temp_files: - os.remove(utils.path.join(tempdir, temp_file)) - os.rmdir(tempdir) - except OSError as e: - logging.info( - '%s: (Ignored) Failed to delete all temporary files in %s.', - e, tempdir) - return resources @@ -311,8 +299,7 @@ def stage_job_resources( resources.extend( _stage_extra_packages(setup_options.extra_packages, google_cloud_options.staging_location, - file_copy=file_copy, - temp_dir=temp_dir)) + temp_dir=temp_dir, file_copy=file_copy)) # Pickle the main session if requested. # We will create the pickled main session locally and then copy it to the