Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 20687e83a -> 0e5c662b4


Clean up usage of temp directories in _stage_extra_packages

* temp_dir is a required parameter, clarified the doc string
* Renamed similarly named tempdir to temp_dir and creates this new
temp directory under the temp_dir. A separate temp directory is
needed for listing only the downloaded resources at this stage.
* Removed the tempdir clean up stage. Caller is giving the tempdir
and responsible for cleaning it. Only caller (stage_job_resources)
already does this.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c5a49a79
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c5a49a79
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c5a49a79

Branch: refs/heads/python-sdk
Commit: c5a49a797c104021126205e546a3e20e6d5d9709
Parents: 20687e8
Author: Ahmet Altay <al...@google.com>
Authored: Wed Jun 15 18:10:34 2016 -0700
Committer: Davor Bonaci <da...@google.com>
Committed: Fri Jun 17 10:49:05 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/utils/dependency.py | 37 ++++++++----------------
 1 file changed, 12 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c5a49a79/sdks/python/apache_beam/utils/dependency.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/dependency.py 
b/sdks/python/apache_beam/utils/dependency.py
index 05fcfd5..1c6ad9c 100644
--- a/sdks/python/apache_beam/utils/dependency.py
+++ b/sdks/python/apache_beam/utils/dependency.py
@@ -116,19 +116,18 @@ def _dependency_file_download(from_url, to_folder):
   return local_download_file
 
 
-def _stage_extra_packages(extra_packages,
-                          staging_location,
-                          file_copy=_dependency_file_copy, temp_dir=None):
+def _stage_extra_packages(extra_packages, staging_location, temp_dir,
+                          file_copy=_dependency_file_copy):
   """Stages a list of local extra packages.
 
   Args:
     extra_packages: Ordered list of local paths to extra packages to be staged.
     staging_location: Staging location for the packages.
+    temp_dir: Temporary folder where the resource building can happen. Caller
+      is responsible for cleaning up this folder after this function returns.
     file_copy: Callable for copying files. The default version will copy from
       a local file to a GCS location using the gsutil tool available in the
       Google Cloud SDK package.
-    temp_dir: Temporary folder where the resource building can happen. If None
-      then a unique temp directory will be created. Used only for testing.
 
   Returns:
     A list of file names (no paths) for the resources staged. All the files
@@ -139,7 +138,7 @@ def _stage_extra_packages(extra_packages,
       name patterns.
   """
   resources = []
-  tempdir = None
+  staging_temp_dir = None
   local_packages = []
   for package in extra_packages:
     if not os.path.basename(package).endswith('.tar.gz'):
@@ -149,11 +148,11 @@ def _stage_extra_packages(extra_packages,
 
     if not os.path.isfile(package):
       if package.startswith('gs://'):
-        if not tempdir:
-          tempdir = tempfile.mkdtemp()
+        if not staging_temp_dir:
+          staging_temp_dir = tempfile.mkdtemp(dir=temp_dir)
         logging.info('Downloading extra package: %s locally before staging',
                      package)
-        _dependency_file_copy(package, tempdir)
+        _dependency_file_copy(package, staging_temp_dir)
       else:
         raise RuntimeError(
             'The file %s cannot be found. It was specified in the '
@@ -161,9 +160,10 @@ def _stage_extra_packages(extra_packages,
     else:
       local_packages.append(package)
 
-  if tempdir:
+  if staging_temp_dir:
     local_packages.extend(
-        [utils.path.join(tempdir, f) for f in os.listdir(tempdir)])
+        [utils.path.join(staging_temp_dir, f) for f in os.listdir(
+            staging_temp_dir)])
 
   for package in local_packages:
     basename = os.path.basename(package)
@@ -186,18 +186,6 @@ def _stage_extra_packages(extra_packages,
   file_copy(os.path.join(temp_dir, EXTRA_PACKAGES_FILE), staged_path)
   resources.append(EXTRA_PACKAGES_FILE)
 
-  # Remove temp files created by downloading packages from GCS.
-  if tempdir:
-    try:
-      temp_files = os.listdir(tempdir)
-      for temp_file in temp_files:
-        os.remove(utils.path.join(tempdir, temp_file))
-      os.rmdir(tempdir)
-    except OSError as e:
-      logging.info(
-          '%s: (Ignored) Failed to delete all temporary files in %s.',
-          e, tempdir)
-
   return resources
 
 
@@ -311,8 +299,7 @@ def stage_job_resources(
     resources.extend(
         _stage_extra_packages(setup_options.extra_packages,
                               google_cloud_options.staging_location,
-                              file_copy=file_copy,
-                              temp_dir=temp_dir))
+                              temp_dir=temp_dir, file_copy=file_copy))
 
   # Pickle the main session if requested.
   # We will create the pickled main session locally and then copy it to the

Reply via email to