Author: Ronan Lamy <[email protected]>
Branch: py3k
Changeset: r83781:33f8f738c7a4
Date: 2016-04-19 23:47 +0100
http://bitbucket.org/pypy/pypy/changeset/33f8f738c7a4/
Log: Merged in follow_symlinks (pull request #428)
Finish implementing the 3.3 extensions to existing os functions.
diff --git a/pypy/interpreter/baseobjspace.py b/pypy/interpreter/baseobjspace.py
--- a/pypy/interpreter/baseobjspace.py
+++ b/pypy/interpreter/baseobjspace.py
@@ -1638,9 +1638,14 @@
return fsdecode(space, w_obj)
def fsencode_w(self, w_obj):
+ from rpython.rlib import rstring
if self.isinstance_w(w_obj, self.w_unicode):
w_obj = self.fsencode(w_obj)
- return self.bytes0_w(w_obj)
+ result = self.bufferstr_w(w_obj, self.BUF_FULL_RO)
+ if '\x00' in result:
+ raise oefmt(self.w_TypeError,
+ "argument must be a string without NUL characters")
+ return rstring.assert_str0(result)
def fsdecode_w(self, w_obj):
if self.isinstance_w(w_obj, self.w_bytes):
diff --git a/pypy/module/posix/interp_posix.py
b/pypy/module/posix/interp_posix.py
--- a/pypy/module/posix/interp_posix.py
+++ b/pypy/module/posix/interp_posix.py
@@ -64,7 +64,7 @@
self.w_obj = w_obj
def as_bytes(self):
- return self.space.bytes0_w(self.w_obj)
+ return self.space.fsencode_w(self.w_obj)
def as_unicode(self):
return self.space.fsdecode_w(self.w_obj)
@@ -83,7 +83,7 @@
fname = FileEncoder(space, w_fname)
return func(fname, *args)
else:
- fname = space.bytes0_w(w_fname)
+ fname = space.fsencode_w(w_fname)
return func(fname, *args)
return dispatch
@@ -112,6 +112,57 @@
return func(fname1, fname2, *args)
return dispatch
[email protected](0)
+def call_rposix(func, path, *args):
+ """Call a function that takes a filesystem path as its first argument"""
+ if path.as_unicode is not None:
+ return func(path.as_unicode, *args)
+ else:
+ path_b = path.as_bytes
+ assert path_b is not None
+ return func(path.as_bytes, *args)
+
+
+class Path(object):
+ _immutable_fields_ = ['as_fd', 'as_bytes', 'as_unicode', 'w_path']
+
+ def __init__(self, fd, bytes, unicode, w_path):
+ self.as_fd = fd
+ self.as_bytes = bytes
+ self.as_unicode = unicode
+ self.w_path = w_path
+
[email protected](2)
+def _unwrap_path(space, w_value, allow_fd=True):
+ if space.is_none(w_value):
+ raise oefmt(space.w_TypeError,
+ "can't specify None for path argument")
+ if _WIN32:
+ try:
+ path_u = space.unicode_w(w_value)
+ return Path(-1, None, path_u, w_value)
+ except OperationError:
+ pass
+ try:
+ path_b = space.fsencode_w(w_value)
+ return Path(-1, path_b, None, w_value)
+ except OperationError:
+ if allow_fd:
+ fd = unwrap_fd(space, w_value, "string, bytes or integer")
+ return Path(fd, None, None, w_value)
+ raise oefmt(space.w_TypeError, "illegal type for path parameter")
+
+class _PathOrFd(Unwrapper):
+ def unwrap(self, space, w_value):
+ return _unwrap_path(space, w_value, allow_fd=True)
+
+class _JustPath(Unwrapper):
+ def unwrap(self, space, w_value):
+ return _unwrap_path(space, w_value, allow_fd=False)
+
+def path_or_fd(allow_fd=True):
+ return _PathOrFd if allow_fd else _JustPath
+
if hasattr(rposix, 'AT_FDCWD'):
DEFAULT_DIR_FD = rposix.AT_FDCWD
@@ -119,8 +170,20 @@
DEFAULT_DIR_FD = -100
DIR_FD_AVAILABLE = False
-def unwrap_fd(space, w_value):
- return space.c_int_w(w_value)
[email protected](2)
+def unwrap_fd(space, w_value, allowed_types='integer'):
+ try:
+ result = space.c_int_w(w_value)
+ except OperationError as e:
+ if not e.match(space, space.w_OverflowError):
+ raise oefmt(space.w_TypeError,
+ "argument should be %s, not %T", allowed_types, w_value)
+ else:
+ raise
+ if result == -1:
+ # -1 is used as sentinel value for not a fd
+ raise oefmt(space.w_ValueError, "invalid file descriptor: -1")
+ return result
def _unwrap_dirfd(space, w_value):
if space.is_none(w_value):
@@ -354,8 +417,11 @@
else:
return build_stat_result(space, st)
-@unwrap_spec(dir_fd=DirFD(available=False), follow_symlinks=kwonly(bool))
-def stat(space, w_path, dir_fd=DEFAULT_DIR_FD, follow_symlinks=True):
+@unwrap_spec(
+ path=path_or_fd(allow_fd=True),
+ dir_fd=DirFD(rposix.HAVE_FSTATAT),
+ follow_symlinks=kwonly(bool))
+def stat(space, path, dir_fd=DEFAULT_DIR_FD, follow_symlinks=True):
"""stat(path, *, dir_fd=None, follow_symlinks=True) -> stat result
Perform a stat system call on the given path.
@@ -371,27 +437,43 @@
link points to.
It is an error to use dir_fd or follow_symlinks when specifying path as
an open file descriptor."""
+ return do_stat(space, "stat", path, dir_fd, follow_symlinks)
+
[email protected](1)
+def do_stat(space, funcname, path, dir_fd, follow_symlinks):
+ """Common implementation for stat() and lstat()"""
try:
- st = dispatch_filename(rposix_stat.stat, 0,
- allow_fd_fn=rposix_stat.fstat)(space, w_path)
- except OSError, e:
- raise wrap_oserror2(space, e, w_path)
+ if path.as_fd != -1:
+ if dir_fd != DEFAULT_DIR_FD:
+ raise oefmt(space.w_ValueError,
+ "%s: can't specify both dir_fd and fd", funcname)
+ if not follow_symlinks:
+ raise oefmt(space.w_ValueError,
+ "%s: cannot use fd and follow_symlinks together", funcname)
+ st = rposix_stat.fstat(path.as_fd)
+ elif follow_symlinks and dir_fd == DEFAULT_DIR_FD:
+ st = call_rposix(rposix_stat.stat, path)
+ elif not follow_symlinks and dir_fd == DEFAULT_DIR_FD:
+ st = call_rposix(rposix_stat.lstat, path)
+ elif rposix.HAVE_FSTATAT:
+ st = call_rposix(rposix_stat.fstatat, path, dir_fd,
follow_symlinks)
+ else:
+ raise oefmt(space.w_NotImplementedError,
+ "%s: unsupported argument combination", funcname)
+ except OSError as e:
+ raise wrap_oserror2(space, e, path.w_path)
else:
return build_stat_result(space, st)
-@unwrap_spec(dir_fd=DirFD(available=False))
-def lstat(space, w_path, dir_fd=DEFAULT_DIR_FD):
+@unwrap_spec(
+ path=path_or_fd(allow_fd=False),
+ dir_fd=DirFD(rposix.HAVE_FSTATAT))
+def lstat(space, path, dir_fd=DEFAULT_DIR_FD):
"""lstat(path, *, dir_fd=None) -> stat result
Like stat(), but do not follow symbolic links.
Equivalent to stat(path, follow_symlinks=False)."""
-
- try:
- st = dispatch_filename(rposix_stat.lstat)(space, w_path)
- except OSError, e:
- raise wrap_oserror2(space, e, w_path)
- else:
- return build_stat_result(space, st)
+ return do_stat(space, "lstat", path, dir_fd, False)
class StatState(object):
def __init__(self, space):
@@ -432,7 +514,9 @@
On some platforms, path may also be specified as an open file descriptor.
If this functionality is unavailable, using it raises an exception."""
try:
- st = dispatch_filename(rposix_stat.statvfs)(space, w_path)
+ st = dispatch_filename(
+ rposix_stat.statvfs,
+ allow_fd_fn=rposix_stat.fstatvfs)(space, w_path)
except OSError as e:
raise wrap_oserror2(space, e, w_path)
else:
@@ -1054,8 +1138,10 @@
raise wrap_oserror(space, e)
-@unwrap_spec(dir_fd=DirFD(rposix.HAVE_READLINKAT))
-def readlink(space, w_path, dir_fd=DEFAULT_DIR_FD):
+@unwrap_spec(
+ path=path_or_fd(allow_fd=False),
+ dir_fd=DirFD(rposix.HAVE_READLINKAT))
+def readlink(space, path, dir_fd=DEFAULT_DIR_FD):
"""readlink(path, *, dir_fd=None) -> path
Return a string representing the path to which the symbolic link points.
@@ -1064,20 +1150,15 @@
and path should be relative; path will then be relative to that directory.
dir_fd may not be implemented on your platform.
If it is unavailable, using it will raise a NotImplementedError."""
- is_unicode = space.isinstance_w(w_path, space.w_unicode)
- if is_unicode:
- path = space.fsencode_w(w_path)
- else:
- path = space.bytes0_w(w_path)
try:
if dir_fd == DEFAULT_DIR_FD:
- result = rposix.readlink(path)
+ result = call_rposix(rposix.readlink, path)
else:
- result = rposix.readlinkat(path, dir_fd)
- except OSError, e:
- raise wrap_oserror2(space, e, w_path)
+ result = call_rposix(rposix.readlinkat, path, dir_fd)
+ except OSError as e:
+ raise wrap_oserror2(space, e, path.w_path)
w_result = space.wrapbytes(result)
- if is_unicode:
+ if space.isinstance_w(path.w_path, space.w_unicode):
return space.fsdecode(w_result)
return w_result
@@ -1258,9 +1339,11 @@
return space.wrap(ret)
-@unwrap_spec(w_times=WrappedDefault(None), w_ns=kwonly(WrappedDefault(None)),
+@unwrap_spec(
+ path=path_or_fd(allow_fd=rposix.HAVE_FUTIMENS),
+ w_times=WrappedDefault(None), w_ns=kwonly(WrappedDefault(None)),
dir_fd=DirFD(rposix.HAVE_UTIMENSAT), follow_symlinks=kwonly(bool))
-def utime(space, w_path, w_times, w_ns, dir_fd=DEFAULT_DIR_FD,
follow_symlinks=True):
+def utime(space, path, w_times, w_ns, dir_fd=DEFAULT_DIR_FD,
follow_symlinks=True):
"""utime(path, times=None, *, ns=None, dir_fd=None, follow_symlinks=True)
Set the access and modified time of path.
@@ -1290,47 +1373,11 @@
not space.is_w(w_ns, space.w_None)):
raise oefmt(space.w_ValueError,
"utime: you may specify either 'times' or 'ns' but not both")
-
- if rposix.HAVE_UTIMENSAT:
- path = space.fsencode_w(w_path)
- try:
- _utimensat(space, path, w_times, w_ns, dir_fd, follow_symlinks)
- return
- except OSError, e:
- raise wrap_oserror2(space, e, w_path)
-
- if not follow_symlinks:
- raise argument_unavailable(space, "utime", "follow_symlinks")
-
- if not space.is_w(w_ns, space.w_None):
- raise oefmt(space.w_NotImplementedError,
- "utime: 'ns' unsupported on this platform on PyPy")
- if space.is_w(w_times, space.w_None):
- try:
- dispatch_filename(rposix.utime, 1)(space, w_path, None)
- return
- except OSError, e:
- raise wrap_oserror2(space, e, w_path)
- try:
- msg = "utime() arg 2 must be a tuple (atime, mtime) or None"
- args_w = space.fixedview(w_times)
- if len(args_w) != 2:
- raise OperationError(space.w_TypeError, space.wrap(msg))
- actime = space.float_w(args_w[0], allow_conversion=False)
- modtime = space.float_w(args_w[1], allow_conversion=False)
- dispatch_filename(rposix.utime, 2)(space, w_path, (actime, modtime))
- except OSError, e:
- raise wrap_oserror2(space, e, w_path)
- except OperationError, e:
- if not e.match(space, space.w_TypeError):
- raise
- raise OperationError(space.w_TypeError, space.wrap(msg))
-
-
-def _utimensat(space, path, w_times, w_ns, dir_fd, follow_symlinks):
+ utime_now = False
if space.is_w(w_times, space.w_None) and space.is_w(w_ns, space.w_None):
atime_s = mtime_s = 0
- atime_ns = mtime_ns = rposix.UTIME_NOW
+ atime_ns = mtime_ns = 0
+ utime_now = True
elif not space.is_w(w_times, space.w_None):
times_w = space.fixedview(w_times)
if len(times_w) != 2:
@@ -1346,9 +1393,75 @@
atime_s, atime_ns = convert_ns(space, args_w[0])
mtime_s, mtime_ns = convert_ns(space, args_w[1])
- rposix.utimensat(
- path, atime_s, atime_ns, mtime_s, mtime_ns,
- dir_fd=dir_fd, follow_symlinks=follow_symlinks)
+ if path.as_fd != -1:
+ if dir_fd != DEFAULT_DIR_FD:
+ raise oefmt(space.w_ValueError,
+ "utime: can't specify both dir_fd and fd")
+ if not follow_symlinks:
+ raise oefmt(space.w_ValueError,
+ "utime: cannot use fd and follow_symlinks together")
+ if utime_now:
+ atime_ns = mtime_ns = rposix.UTIME_NOW
+ try:
+ rposix.futimens(path.as_fd, atime_s, atime_ns, mtime_s, mtime_ns)
+ return
+ except OSError as e:
+ # CPython's Modules/posixmodule.c::posix_utime() has this comment:
+ # /* Avoid putting the file name into the error here,
+ # as that may confuse the user into believing that
+ # something is wrong with the file, when it also
+ # could be the time stamp that gives a problem. */
+ # so we use wrap_oserror() instead of wrap_oserror2() here
+ raise wrap_oserror(space, e)
+
+ if rposix.HAVE_UTIMENSAT:
+ path_b = path.as_bytes
+ if path_b is None:
+ raise oefmt(space.w_NotImplementedError,
+ "utime: unsupported value for 'path'")
+ try:
+ if utime_now:
+ rposix.utimensat(
+ path_b, 0, rposix.UTIME_NOW, 0, rposix.UTIME_NOW,
+ dir_fd=dir_fd, follow_symlinks=follow_symlinks)
+ else:
+ rposix.utimensat(
+ path_b, atime_s, atime_ns, mtime_s, mtime_ns,
+ dir_fd=dir_fd, follow_symlinks=follow_symlinks)
+ return
+ except OSError as e:
+ # see comment above
+ raise wrap_oserror(space, e)
+
+ if not follow_symlinks:
+ raise argument_unavailable(space, "utime", "follow_symlinks")
+
+ if not space.is_w(w_ns, space.w_None):
+ raise oefmt(space.w_NotImplementedError,
+ "utime: 'ns' unsupported on this platform on PyPy")
+ if utime_now:
+ try:
+ call_rposix(rposix.utime, path, None)
+ except OSError as e:
+ # see comment above
+ raise wrap_oserror(space, e)
+ try:
+ msg = "utime() arg 2 must be a tuple (atime, mtime) or None"
+ args_w = space.fixedview(w_times)
+ if len(args_w) != 2:
+ raise OperationError(space.w_TypeError, space.wrap(msg))
+ actime = space.float_w(args_w[0], allow_conversion=False)
+ modtime = space.float_w(args_w[1], allow_conversion=False)
+ except OperationError, e:
+ if not e.match(space, space.w_TypeError):
+ raise
+ raise OperationError(space.w_TypeError, space.wrap(msg))
+ try:
+ call_rposix(rposix.utime, path, (actime, modtime))
+ except OSError as e:
+ # see comment above
+ raise wrap_oserror(space, e)
+
def convert_seconds(space, w_time):
if space.isinstance_w(w_time, space.w_float):
@@ -1745,13 +1858,19 @@
raise wrap_oserror(space, e)
return space.wrap(res)
-@unwrap_spec(path='str0')
+@unwrap_spec(path=path_or_fd(allow_fd=hasattr(os, 'fpathconf')))
def pathconf(space, path, w_name):
num = confname_w(space, w_name, os.pathconf_names)
- try:
- res = os.pathconf(path, num)
- except OSError, e:
- raise wrap_oserror(space, e)
+ if path.as_fd != -1:
+ try:
+ res = os.fpathconf(path.as_fd, num)
+ except OSError, e:
+ raise wrap_oserror(space, e)
+ else:
+ try:
+ res = os.pathconf(path.as_bytes, num)
+ except OSError, e:
+ raise wrap_oserror2(space, e, path.w_path)
return space.wrap(res)
def confstr(space, w_name):
diff --git a/pypy/module/posix/test/test_posix2.py
b/pypy/module/posix/test/test_posix2.py
--- a/pypy/module/posix/test/test_posix2.py
+++ b/pypy/module/posix/test/test_posix2.py
@@ -174,6 +174,10 @@
import stat
st = self.posix.stat(".")
assert stat.S_ISDIR(st.st_mode)
+ st = self.posix.stat(b".")
+ assert stat.S_ISDIR(st.st_mode)
+ st = self.posix.stat(bytearray(b"."))
+ assert stat.S_ISDIR(st.st_mode)
st = self.posix.lstat(".")
assert stat.S_ISDIR(st.st_mode)
@@ -185,6 +189,12 @@
assert exc.value.errno == errno.ENOENT
assert exc.value.filename == "nonexistentdir/nonexistentfile"
+ excinfo = raises(TypeError, self.posix.stat, None)
+ assert "can't specify None" in str(excinfo.value)
+ excinfo = raises(TypeError, self.posix.stat, 2.)
+ assert "should be string, bytes or integer, not float" in
str(excinfo.value)
+ raises(ValueError, self.posix.stat, -1)
+
if hasattr(__import__(os.name), "statvfs"):
def test_statvfs(self):
st = self.posix.statvfs(".")
@@ -250,7 +260,7 @@
try:
self.posix.utime('qowieuqw/oeiu', arg)
except OSError as e:
- assert e.filename == 'qowieuqw/oeiu'
+ pass
else:
assert 0
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit