https://github.com/python/cpython/commit/69058e20e420181abdc51094474f590d32cd7174
commit: 69058e20e420181abdc51094474f590d32cd7174
branch: main
author: Barney Gale <[email protected]>
committer: barneygale <[email protected]>
date: 2024-06-18T22:15:18+01:00
summary:

GH-73991: Use same signature for `shutil._rmtree_[un]safe()`. (#120517)

Preparatory work for moving `_rmtree_unsafe()` and `_rmtree_safe_fd()` to
`pathlib._os` so that they can be used from both `shutil` and `pathlib`.

Move implementation-specific setup from `rmtree()` into the safe/unsafe
functions, and give them the same signature `(path, dir_fd, onexc)`.

In the tests, mock `os.open` rather than `_rmtree_safe_fd()` to ensure the
FD-based walk is used, and replace a couple references to
`shutil._use_fd_functions` with `shutil.rmtree.avoids_symlink_attacks`
(which has the same value).

No change of behaviour.

files:
M Lib/shutil.py
M Lib/test/test_shutil.py

diff --git a/Lib/shutil.py b/Lib/shutil.py
index b0d49e98cfe5f9..0235f6bae32f14 100644
--- a/Lib/shutil.py
+++ b/Lib/shutil.py
@@ -605,7 +605,22 @@ def _rmtree_islink(st):
         return stat.S_ISLNK(st.st_mode)
 
 # version vulnerable to race conditions
-def _rmtree_unsafe(path, onexc):
+def _rmtree_unsafe(path, dir_fd, onexc):
+    if dir_fd is not None:
+        raise NotImplementedError("dir_fd unavailable on this platform")
+    try:
+        st = os.lstat(path)
+    except OSError as err:
+        onexc(os.lstat, path, err)
+        return
+    try:
+        if _rmtree_islink(st):
+            # symlinks to directories are forbidden, see bug #1669
+            raise OSError("Cannot call rmtree on a symbolic link")
+    except OSError as err:
+        onexc(os.path.islink, path, err)
+        # can't continue even if onexc hook returns
+        return
     def onerror(err):
         if not isinstance(err, FileNotFoundError):
             onexc(os.scandir, err.filename, err)
@@ -635,7 +650,26 @@ def onerror(err):
         onexc(os.rmdir, path, err)
 
 # Version using fd-based APIs to protect against races
-def _rmtree_safe_fd(stack, onexc):
+def _rmtree_safe_fd(path, dir_fd, onexc):
+    # While the unsafe rmtree works fine on bytes, the fd based does not.
+    if isinstance(path, bytes):
+        path = os.fsdecode(path)
+    stack = [(os.lstat, dir_fd, path, None)]
+    try:
+        while stack:
+            _rmtree_safe_fd_step(stack, onexc)
+    finally:
+        # Close any file descriptors still on the stack.
+        while stack:
+            func, fd, path, entry = stack.pop()
+            if func is not os.close:
+                continue
+            try:
+                os.close(fd)
+            except OSError as err:
+                onexc(os.close, path, err)
+
+def _rmtree_safe_fd_step(stack, onexc):
     # Each stack item has four elements:
     # * func: The first operation to perform: os.lstat, os.close or os.rmdir.
     #   Walking a directory starts with an os.lstat() to detect symlinks; in
@@ -710,6 +744,7 @@ def _rmtree_safe_fd(stack, onexc):
                      os.supports_dir_fd and
                      os.scandir in os.supports_fd and
                      os.stat in os.supports_follow_symlinks)
+_rmtree_impl = _rmtree_safe_fd if _use_fd_functions else _rmtree_unsafe
 
 def rmtree(path, ignore_errors=False, onerror=None, *, onexc=None, 
dir_fd=None):
     """Recursively delete a directory tree.
@@ -753,41 +788,7 @@ def onexc(*args):
                     exc_info = type(exc), exc, exc.__traceback__
                 return onerror(func, path, exc_info)
 
-    if _use_fd_functions:
-        # While the unsafe rmtree works fine on bytes, the fd based does not.
-        if isinstance(path, bytes):
-            path = os.fsdecode(path)
-        stack = [(os.lstat, dir_fd, path, None)]
-        try:
-            while stack:
-                _rmtree_safe_fd(stack, onexc)
-        finally:
-            # Close any file descriptors still on the stack.
-            while stack:
-                func, fd, path, entry = stack.pop()
-                if func is not os.close:
-                    continue
-                try:
-                    os.close(fd)
-                except OSError as err:
-                    onexc(os.close, path, err)
-    else:
-        if dir_fd is not None:
-            raise NotImplementedError("dir_fd unavailable on this platform")
-        try:
-            st = os.lstat(path)
-        except OSError as err:
-            onexc(os.lstat, path, err)
-            return
-        try:
-            if _rmtree_islink(st):
-                # symlinks to directories are forbidden, see bug #1669
-                raise OSError("Cannot call rmtree on a symbolic link")
-        except OSError as err:
-            onexc(os.path.islink, path, err)
-            # can't continue even if onexc hook returns
-            return
-        return _rmtree_unsafe(path, onexc)
+    _rmtree_impl(path, dir_fd, onexc)
 
 # Allow introspection of whether or not the hardening against symlink
 # attacks is supported on the current platform
diff --git a/Lib/test/test_shutil.py b/Lib/test/test_shutil.py
index bccb81e0737c57..02ef172613bf94 100644
--- a/Lib/test/test_shutil.py
+++ b/Lib/test/test_shutil.py
@@ -558,25 +558,23 @@ def test_rmtree_uses_safe_fd_version_if_available(self):
                              os.listdir in os.supports_fd and
                              os.stat in os.supports_follow_symlinks)
         if _use_fd_functions:
-            self.assertTrue(shutil._use_fd_functions)
             self.assertTrue(shutil.rmtree.avoids_symlink_attacks)
             tmp_dir = self.mkdtemp()
             d = os.path.join(tmp_dir, 'a')
             os.mkdir(d)
             try:
-                real_rmtree = shutil._rmtree_safe_fd
+                real_open = os.open
                 class Called(Exception): pass
                 def _raiser(*args, **kwargs):
                     raise Called
-                shutil._rmtree_safe_fd = _raiser
+                os.open = _raiser
                 self.assertRaises(Called, shutil.rmtree, d)
             finally:
-                shutil._rmtree_safe_fd = real_rmtree
+                os.open = real_open
         else:
-            self.assertFalse(shutil._use_fd_functions)
             self.assertFalse(shutil.rmtree.avoids_symlink_attacks)
 
-    @unittest.skipUnless(shutil._use_fd_functions, "requires safe rmtree")
+    @unittest.skipUnless(shutil.rmtree.avoids_symlink_attacks, "requires safe 
rmtree")
     def test_rmtree_fails_on_close(self):
         # Test that the error handler is called for failed os.close() and that
         # os.close() is only called once for a file descriptor.
@@ -611,7 +609,7 @@ def onexc(*args):
         self.assertEqual(errors[1][1], dir1)
         self.assertEqual(close_count, 2)
 
-    @unittest.skipUnless(shutil._use_fd_functions, "dir_fd is not supported")
+    @unittest.skipUnless(shutil.rmtree.avoids_symlink_attacks, "dir_fd is not 
supported")
     def test_rmtree_with_dir_fd(self):
         tmp_dir = self.mkdtemp()
         victim = 'killme'
@@ -625,7 +623,7 @@ def test_rmtree_with_dir_fd(self):
         shutil.rmtree(victim, dir_fd=dir_fd)
         self.assertFalse(os.path.exists(fullname))
 
-    @unittest.skipIf(shutil._use_fd_functions, "dir_fd is supported")
+    @unittest.skipIf(shutil.rmtree.avoids_symlink_attacks, "dir_fd is 
supported")
     def test_rmtree_with_dir_fd_unsupported(self):
         tmp_dir = self.mkdtemp()
         with self.assertRaises(NotImplementedError):

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]

Reply via email to