Author: Armin Rigo <[email protected]>
Branch: py3.5-scandir
Changeset: r86387:cfe6bf314382
Date: 2016-08-22 01:03 +0200
http://bitbucket.org/pypy/pypy/changeset/cfe6bf314382/

Log:    Finish the first draft of os.scandir() (missing: Win32)

diff --git a/pypy/module/posix/interp_scandir.py 
b/pypy/module/posix/interp_scandir.py
--- a/pypy/module/posix/interp_scandir.py
+++ b/pypy/module/posix/interp_scandir.py
@@ -1,12 +1,14 @@
+import stat
+from errno import ENOENT
 from rpython.rlib import rgc
-from rpython.rlib import rposix_scandir
+from rpython.rlib import rposix, rposix_scandir, rposix_stat
 
 from pypy.interpreter.gateway import unwrap_spec, WrappedDefault, interp2app
 from pypy.interpreter.error import OperationError, oefmt, wrap_oserror2
 from pypy.interpreter.typedef import TypeDef, GetSetProperty
 from pypy.interpreter.baseobjspace import W_Root
 
-from pypy.module.posix.interp_posix import unwrap_fd
+from pypy.module.posix.interp_posix import unwrap_fd, build_stat_result
 
 
 @unwrap_spec(w_path=WrappedDefault(u"."))
@@ -36,13 +38,18 @@
     w_path_prefix = space.newbytes(path_prefix)
     if not result_is_bytes:
         w_path_prefix = space.fsdecode(w_path_prefix)
-    return W_ScandirIterator(space, dirp, w_path_prefix, result_is_bytes)
+    if rposix.HAVE_FSTATAT:
+        dirfd = rposix.c_dirfd(dirp)
+    else:
+        dirfd = -1
+    return W_ScandirIterator(space, dirp, dirfd, w_path_prefix, 
result_is_bytes)
 
 
 class W_ScandirIterator(W_Root):
-    def __init__(self, space, dirp, w_path_prefix, result_is_bytes):
+    def __init__(self, space, dirp, dirfd, w_path_prefix, result_is_bytes):
         self.space = space
         self.dirp = dirp
+        self.dirfd = dirfd
         self.w_path_prefix = w_path_prefix
         self.result_is_bytes = result_is_bytes
 
@@ -57,6 +64,7 @@
     def fail(self, err=None):
         dirp = self.dirp
         if dirp:
+            self.dirfd = -1
             self.dirp = rposix_scandir.NULL_DIRP
             rposix_scandir.closedir(dirp)
         if err is None:
@@ -84,10 +92,7 @@
                 break
         #
         known_type = rposix_scandir.get_known_type(entry)
-        w_name = space.newbytes(name)
-        if not self.result_is_bytes:
-            w_name = space.fsdecode(w_name)
-        direntry = W_DirEntry(w_name, self.w_path_prefix, known_type)
+        direntry = W_DirEntry(self, name, known_type)
         return space.wrap(direntry)
 
 
@@ -99,13 +104,31 @@
 W_ScandirIterator.typedef.acceptable_as_base_class = False
 
 
+class FileNotFound(Exception):
+    pass
+
+assert 0 <= rposix_scandir.DT_UNKNOWN <= 255
+assert 0 <= rposix_scandir.DT_REG <= 255
+assert 0 <= rposix_scandir.DT_DIR <= 255
+assert 0 <= rposix_scandir.DT_LNK <= 255
+FLAG_STAT  = 256
+FLAG_LSTAT = 512
+
+
 class W_DirEntry(W_Root):
     w_path = None
 
-    def __init__(self, w_name, w_path_prefix, known_type):
+    def __init__(self, scandir_iterator, name, known_type):
+        self.space = scandir_iterator.space
+        self.scandir_iterator = scandir_iterator
+        self.name = name     # always bytes on Posix
+        self.flags = known_type
+        assert known_type == (known_type & 255)
+        #
+        w_name = self.space.newbytes(name)
+        if not scandir_iterator.result_is_bytes:
+            w_name = self.space.fsdecode(w_name)
         self.w_name = w_name
-        self.w_path_prefix = w_path_prefix
-        self.known_type = known_type
 
     def fget_name(self, space):
         return self.w_name
@@ -113,33 +136,126 @@
     def fget_path(self, space):
         w_path = self.w_path
         if w_path is None:
-            w_path = space.add(self.w_path_prefix, self.w_name)
+            w_path_prefix = self.scandir_iterator.w_path_prefix
+            w_path = space.add(w_path_prefix, self.w_name)
             self.w_path = w_path
         return w_path
 
+    # The internal methods, used to implement the public methods at
+    # the end of the class.  Every method only calls methods *before*
+    # it in program order, so there is no cycle.
+
+    def get_lstat(self):
+        """Get the lstat() of the direntry."""
+        if (self.flags & FLAG_LSTAT) == 0:
+            # Unlike CPython, try to use fstatat() if possible
+            dirfd = self.scandir_iterator.dirfd
+            if dirfd != -1:
+                st = rposix_stat.fstatat(self.name, dirfd,
+                                         follow_symlinks=False)
+            else:
+                path = self.space.fsencode_w(self.fget_path(self.space))
+                st = rposix_stat.lstat(path)
+            self.d_lstat = st
+            self.flags |= FLAG_LSTAT
+        return self.d_lstat
+
+    def get_stat(self):
+        """Get the stat() of the direntry.  This is implemented in
+        such a way that it won't do both a stat() and a lstat().
+        """
+        if (self.flags & FLAG_STAT) == 0:
+            # We don't have the 'd_stat'.  If the known_type says the
+            # direntry is not a DT_LNK, then try to get and cache the
+            # 'd_lstat' instead.  Then, or if we already have a
+            # 'd_lstat' from before, *and* if the 'd_lstat' is not a
+            # S_ISLNK, we can reuse it unchanged for 'd_stat'.
+            #
+            # Note how, in the common case where the known_type says
+            # it is a DT_REG or DT_DIR, then we call and cache lstat()
+            # and that's it.  Also note that in a d_type-less OS or on
+            # a filesystem that always answer DT_UNKNOWN, this method
+            # will instead only call at most stat(), but not cache it
+            # as 'd_lstat'.
+            known_type = self.flags & 255
+            if (known_type != rposix_scandir.DT_UNKNOWN and
+                known_type != rposix_scandir.DT_LNK):
+                self.get_lstat()    # fill the 'd_lstat' cache
+                have_lstat = True
+            else:
+                have_lstat = (self.flags & FLAG_LSTAT) != 0
+
+            if have_lstat:
+                # We have the lstat() but not the stat().  They are
+                # the same, unless the 'd_lstat' is a S_IFLNK.
+                must_call_stat = stat.S_ISLNK(self.d_lstat.st_mode)
+            else:
+                must_call_stat = True
+
+            if must_call_stat:
+                # Must call stat().  Try to use fstatat() if possible
+                dirfd = self.scandir_iterator.dirfd
+                if dirfd != -1:
+                    st = rposix_stat.fstatat(self.name, dirfd,
+                                             follow_symlinks=True)
+                else:
+                    path = self.space.fsencode_w(self.fget_path(self.space))
+                    st = rposix_stat.stat(path)
+            else:
+                st = self.d_lstat
+
+            self.d_stat = st
+            self.flags |= FLAG_STAT
+        return self.d_stat
+
+    def get_stat_or_lstat(self, follow_symlinks):
+        if follow_symlinks:
+            return self.get_stat()
+        else:
+            return self.get_lstat()
+
+    def check_mode(self, follow_symlinks):
+        """Get the stat() or lstat() of the direntry, and return the
+        S_IFMT.  If calling stat()/lstat() gives us ENOENT, return -1
+        instead; it is better to give up and answer "no, not this type"
+        to requests, rather than propagate the error.
+        """
+        try:
+            st = self.get_stat_or_lstat(follow_symlinks)
+        except OSError as e:
+            if e.errno == ENOENT:    # not found
+                return -1
+            raise wrap_oserror2(self.space, e, self.fget_path(self.space))
+        return stat.S_IFMT(st.st_mode)
+
     def is_dir(self, follow_symlinks):
-        known_type = self.known_type
+        known_type = self.flags & 255
         if known_type != rposix_scandir.DT_UNKNOWN:
             if known_type == rposix_scandir.DT_DIR:
                 return True
-            if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+            elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
+                pass    # don't know in this case
+            else:
                 return False
-        xxxx
+        return self.check_mode(follow_symlinks) == stat.S_IFDIR
 
     def is_file(self, follow_symlinks):
-        known_type = self.known_type
+        known_type = self.flags & 255
         if known_type != rposix_scandir.DT_UNKNOWN:
             if known_type == rposix_scandir.DT_REG:
                 return True
-            if known_type != rposix_scandir.DT_LNK or not follow_symlinks:
+            elif follow_symlinks and known_type == rposix_scandir.DT_LNK:
+                pass    # don't know in this case
+            else:
                 return False
-        xxxx
+        return self.check_mode(follow_symlinks) == stat.S_IFREG
 
     def is_symlink(self):
-        known_type = self.known_type
+        """Check if the direntry is a symlink.  May get the lstat()."""
+        known_type = self.flags & 255
         if known_type != rposix_scandir.DT_UNKNOWN:
             return known_type == rposix_scandir.DT_LNK
-        xxxx
+        return self.check_mode(follow_symlinks=False) == stat.S_IFLNK
 
     @unwrap_spec(follow_symlinks=int)
     def descr_is_dir(self, space, __kwonly__, follow_symlinks=1):
@@ -155,6 +271,13 @@
         """return True if the entry is a symbolic link; cached per entry"""
         return space.wrap(self.is_symlink())
 
+    @unwrap_spec(follow_symlinks=int)
+    def descr_stat(self, space, __kwonly__, follow_symlinks=1):
+        """return stat_result object for the entry; cached per entry"""
+        st = self.get_stat_or_lstat(follow_symlinks)
+        return build_stat_result(self.space, st)
+
+
 W_DirEntry.typedef = TypeDef(
     'posix.DirEntry',
     name = GetSetProperty(W_DirEntry.fget_name,
@@ -166,5 +289,6 @@
     is_dir = interp2app(W_DirEntry.descr_is_dir),
     is_file = interp2app(W_DirEntry.descr_is_file),
     is_symlink = interp2app(W_DirEntry.descr_is_symlink),
+    stat = interp2app(W_DirEntry.descr_stat),
 )
 W_DirEntry.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/posix/test/test_scandir.py 
b/pypy/module/posix/test/test_scandir.py
--- a/pypy/module/posix/test/test_scandir.py
+++ b/pypy/module/posix/test/test_scandir.py
@@ -78,6 +78,21 @@
         assert type(d.path) is bytes
         assert d.path == b'/' + d.name
 
+    def test_stat1(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir1))
+        assert d.name == 'file1'
+        assert d.stat().st_mode & 0o170000 == 0o100000    # S_IFREG
+        assert d.stat().st_size == 0
+
+    def test_stat4(self):
+        posix = self.posix
+        d = next(posix.scandir(self.dir4))
+        assert d.name == 'sdir4'
+        assert d.stat().st_mode & 0o170000 == 0o040000    # S_IFDIR
+        assert d.stat(follow_symlinks=True).st_mode &0o170000 == 0o040000
+        assert d.stat(follow_symlinks=False).st_mode&0o170000 == 0o120000 
#IFLNK
+
     def test_dir1(self):
         posix = self.posix
         d = next(posix.scandir(self.dir1))
@@ -86,6 +101,8 @@
         assert not d.is_dir()
         assert not d.is_symlink()
         raises(TypeError, d.is_file, True)
+        assert     d.is_file(follow_symlinks=False)
+        assert not d.is_dir(follow_symlinks=False)
 
     def test_dir2(self):
         posix = self.posix
@@ -94,6 +111,8 @@
         assert not d.is_file()
         assert     d.is_dir()
         assert not d.is_symlink()
+        assert not d.is_file(follow_symlinks=False)
+        assert     d.is_dir(follow_symlinks=False)
 
     def test_dir3(self):
         posix = self.posix
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit

Reply via email to