https://github.com/python/cpython/commit/ad90c5fabc415d4e46205947cceda82893ec1460 commit: ad90c5fabc415d4e46205947cceda82893ec1460 branch: main author: Barney Gale <barney.g...@gmail.com> committer: barneygale <barney.g...@gmail.com> date: 2025-03-11T20:54:22Z summary:
GH-130614: pathlib ABCs: revise test suite for readable paths (#131018) Test `pathlib.types._ReadablePath` in a dedicated test module. These tests cover `ReadableZipPath`, `ReadableLocalPath` and `Path`, where the former two classes are implementations of `_ReadablePath` for use in tests. files: A Lib/test/test_pathlib/support/local_path.py A Lib/test/test_pathlib/support/zip_path.py A Lib/test/test_pathlib/test_read.py M Lib/test/test_pathlib/test_pathlib.py M Lib/test/test_pathlib/test_pathlib_abc.py diff --git a/Lib/test/test_pathlib/support/local_path.py b/Lib/test/test_pathlib/support/local_path.py new file mode 100644 index 00000000000000..1d2b03df225978 --- /dev/null +++ b/Lib/test/test_pathlib/support/local_path.py @@ -0,0 +1,145 @@ +""" +Implementation of ReadablePath for local paths, for use in pathlib tests. + +LocalPathGround is also defined here. It helps establish the "ground truth" +about local paths in tests. +""" + +import os +import pathlib.types + +from test.support import os_helper +from test.test_pathlib.support.lexical_path import LexicalPath + + +class LocalPathGround: + can_symlink = os_helper.can_symlink() + + def __init__(self, path_cls): + self.path_cls = path_cls + + def setup(self, local_suffix=""): + root = self.path_cls(os_helper.TESTFN + local_suffix) + os.mkdir(root) + return root + + def teardown(self, root): + os_helper.rmtree(root) + + def create_file(self, p, data=b''): + with open(p, 'wb') as f: + f.write(data) + + def create_dir(self, p): + os.mkdir(p) + + def create_symlink(self, p, target): + os.symlink(target, p) + + def create_hierarchy(self, p): + os.mkdir(os.path.join(p, 'dirA')) + os.mkdir(os.path.join(p, 'dirB')) + os.mkdir(os.path.join(p, 'dirC')) + os.mkdir(os.path.join(p, 'dirC', 'dirD')) + with open(os.path.join(p, 'fileA'), 'wb') as f: + f.write(b"this is file A\n") + with open(os.path.join(p, 'dirB', 'fileB'), 'wb') as f: + f.write(b"this is file B\n") + with open(os.path.join(p, 'dirC', 'fileC'), 'wb') as f: + f.write(b"this is file C\n") + with open(os.path.join(p, 'dirC', 'novel.txt'), 'wb') as f: + f.write(b"this is a novel\n") + with open(os.path.join(p, 'dirC', 'dirD', 'fileD'), 'wb') as f: + f.write(b"this is file D\n") + if self.can_symlink: + # Relative symlinks. + os.symlink('fileA', os.path.join(p, 'linkA')) + os.symlink('non-existing', os.path.join(p, 'brokenLink')) + os.symlink('dirB', + os.path.join(p, 'linkB'), + target_is_directory=True) + os.symlink(os.path.join('..', 'dirB'), + os.path.join(p, 'dirA', 'linkC'), + target_is_directory=True) + # Broken symlink (pointing to itself). + os.symlink('brokenLinkLoop', os.path.join(p, 'brokenLinkLoop')) + + isdir = staticmethod(os.path.isdir) + isfile = staticmethod(os.path.isfile) + islink = staticmethod(os.path.islink) + readlink = staticmethod(os.readlink) + + def readtext(self, p): + with open(p, 'r') as f: + return f.read() + + def readbytes(self, p): + with open(p, 'rb') as f: + return f.read() + + +class LocalPathInfo(pathlib.types.PathInfo): + """ + Simple implementation of PathInfo for a local path + """ + __slots__ = ('_path', '_exists', '_is_dir', '_is_file', '_is_symlink') + + def __init__(self, path): + self._path = str(path) + self._exists = None + self._is_dir = None + self._is_file = None + self._is_symlink = None + + def exists(self, *, follow_symlinks=True): + """Whether this path exists.""" + if not follow_symlinks and self.is_symlink(): + return True + if self._exists is None: + self._exists = os.path.exists(self._path) + return self._exists + + def is_dir(self, *, follow_symlinks=True): + """Whether this path is a directory.""" + if not follow_symlinks and self.is_symlink(): + return False + if self._is_dir is None: + self._is_dir = os.path.isdir(self._path) + return self._is_dir + + def is_file(self, *, follow_symlinks=True): + """Whether this path is a regular file.""" + if not follow_symlinks and self.is_symlink(): + return False + if self._is_file is None: + self._is_file = os.path.isfile(self._path) + return self._is_file + + def is_symlink(self): + """Whether this path is a symbolic link.""" + if self._is_symlink is None: + self._is_symlink = os.path.islink(self._path) + return self._is_symlink + + +class ReadableLocalPath(pathlib.types._ReadablePath, LexicalPath): + """ + Simple implementation of a ReadablePath class for local filesystem paths. + """ + __slots__ = ('info',) + + def __init__(self, *pathsegments): + super().__init__(*pathsegments) + self.info = LocalPathInfo(self) + + def __fspath__(self): + return str(self) + + def __open_rb__(self, buffering=-1): + return open(self, 'rb') + + def iterdir(self): + return (self / name for name in os.listdir(self)) + + def readlink(self): + return self.with_segments(os.readlink(self)) diff --git a/Lib/test/test_pathlib/support/zip_path.py b/Lib/test/test_pathlib/support/zip_path.py new file mode 100644 index 00000000000000..ab6a929fc4a504 --- /dev/null +++ b/Lib/test/test_pathlib/support/zip_path.py @@ -0,0 +1,278 @@ +""" +Implementation of ReadablePath for zip file members, for use in pathlib tests. + +ZipPathGround is also defined here. It helps establish the "ground truth" +about zip file members in tests. +""" + +import errno +import io +import pathlib.types +import posixpath +import stat +import zipfile +from stat import S_IFMT, S_ISDIR, S_ISREG, S_ISLNK + + +class ZipPathGround: + can_symlink = True + + def __init__(self, path_cls): + self.path_cls = path_cls + + def setup(self, local_suffix=""): + return self.path_cls(zip_file=zipfile.ZipFile(io.BytesIO(), "w")) + + def teardown(self, root): + root.zip_file.close() + + def create_file(self, path, data=b''): + path.zip_file.writestr(str(path), data) + + def create_dir(self, path): + path.zip_file.mkdir(str(path)) + + def create_symlink(self, path, target): + zip_info = zipfile.ZipInfo(str(path)) + zip_info.external_attr = stat.S_IFLNK << 16 + path.zip_file.writestr(zip_info, target.encode()) + + def create_hierarchy(self, p): + # Add regular files + self.create_file(p.joinpath('fileA'), b'this is file A\n') + self.create_file(p.joinpath('dirB/fileB'), b'this is file B\n') + self.create_file(p.joinpath('dirC/fileC'), b'this is file C\n') + self.create_file(p.joinpath('dirC/dirD/fileD'), b'this is file D\n') + self.create_file(p.joinpath('dirC/novel.txt'), b'this is a novel\n') + # Add symlinks + self.create_symlink(p.joinpath('linkA'), 'fileA') + self.create_symlink(p.joinpath('linkB'), 'dirB') + self.create_symlink(p.joinpath('dirA/linkC'), '../dirB') + self.create_symlink(p.joinpath('brokenLink'), 'non-existing') + self.create_symlink(p.joinpath('brokenLinkLoop'), 'brokenLinkLoop') + + def readtext(self, p): + with p.zip_file.open(str(p), 'r') as f: + f = io.TextIOWrapper(f) + return f.read() + + def readbytes(self, p): + with p.zip_file.open(str(p), 'r') as f: + return f.read() + + readlink = readtext + + def isdir(self, p): + path_str = str(p) + "/" + return path_str in p.zip_file.NameToInfo + + def isfile(self, p): + info = p.zip_file.NameToInfo.get(str(p)) + if info is None: + return False + return not stat.S_ISLNK(info.external_attr >> 16) + + def islink(self, p): + info = p.zip_file.NameToInfo.get(str(p)) + if info is None: + return False + return stat.S_ISLNK(info.external_attr >> 16) + + +class MissingZipPathInfo: + """ + PathInfo implementation that is used when a zip file member is missing. + """ + __slots__ = () + + def exists(self, follow_symlinks=True): + return False + + def is_dir(self, follow_symlinks=True): + return False + + def is_file(self, follow_symlinks=True): + return False + + def is_symlink(self): + return False + + def resolve(self): + return self + + +missing_zip_path_info = MissingZipPathInfo() + + +class ZipPathInfo: + """ + PathInfo implementation for an existing zip file member. + """ + __slots__ = ('zip_file', 'zip_info', 'parent', 'children') + + def __init__(self, zip_file, parent=None): + self.zip_file = zip_file + self.zip_info = None + self.parent = parent or self + self.children = {} + + def exists(self, follow_symlinks=True): + if follow_symlinks and self.is_symlink(): + return self.resolve().exists() + return True + + def is_dir(self, follow_symlinks=True): + if follow_symlinks and self.is_symlink(): + return self.resolve().is_dir() + elif self.zip_info is None: + return True + elif fmt := S_IFMT(self.zip_info.external_attr >> 16): + return S_ISDIR(fmt) + else: + return self.zip_info.filename.endswith('/') + + def is_file(self, follow_symlinks=True): + if follow_symlinks and self.is_symlink(): + return self.resolve().is_file() + elif self.zip_info is None: + return False + elif fmt := S_IFMT(self.zip_info.external_attr >> 16): + return S_ISREG(fmt) + else: + return not self.zip_info.filename.endswith('/') + + def is_symlink(self): + if self.zip_info is None: + return False + elif fmt := S_IFMT(self.zip_info.external_attr >> 16): + return S_ISLNK(fmt) + else: + return False + + def resolve(self, path=None, create=False, follow_symlinks=True): + """ + Traverse zip hierarchy (parents, children and symlinks) starting + from this PathInfo. This is called from three places: + + - When a zip file member is added to ZipFile.filelist, this method + populates the ZipPathInfo tree (using create=True). + - When ReadableZipPath.info is accessed, this method is finds a + ZipPathInfo entry for the path without resolving any final symlink + (using follow_symlinks=False) + - When ZipPathInfo methods are called with follow_symlinks=True, this + method resolves any symlink in the final path position. + """ + link_count = 0 + stack = path.split('/')[::-1] if path else [] + info = self + while True: + if info.is_symlink() and (follow_symlinks or stack): + link_count += 1 + if link_count >= 40: + return missing_zip_path_info # Symlink loop! + path = info.zip_file.read(info.zip_info).decode() + stack += path.split('/')[::-1] if path else [] + info = info.parent + + if stack: + name = stack.pop() + else: + return info + + if name == '..': + info = info.parent + elif name and name != '.': + if name not in info.children: + if create: + info.children[name] = ZipPathInfo(info.zip_file, info) + else: + return missing_zip_path_info # No such child! + info = info.children[name] + + +class ZipFileList: + """ + `list`-like object that we inject as `ZipFile.filelist`. We maintain a + tree of `ZipPathInfo` objects representing the zip file members. + """ + + __slots__ = ('tree', '_items') + + def __init__(self, zip_file): + self.tree = ZipPathInfo(zip_file) + self._items = [] + for item in zip_file.filelist: + self.append(item) + + def __len__(self): + return len(self._items) + + def __iter__(self): + return iter(self._items) + + def append(self, item): + self._items.append(item) + self.tree.resolve(item.filename, create=True).zip_info = item + + +class ReadableZipPath(pathlib.types._ReadablePath): + """ + Simple implementation of a ReadablePath class for .zip files. + """ + + __slots__ = ('_segments', 'zip_file') + parser = posixpath + + def __init__(self, *pathsegments, zip_file): + self._segments = pathsegments + self.zip_file = zip_file + if not isinstance(zip_file.filelist, ZipFileList): + zip_file.filelist = ZipFileList(zip_file) + + def __hash__(self): + return hash((str(self), self.zip_file)) + + def __eq__(self, other): + if not isinstance(other, ReadableZipPath): + return NotImplemented + return str(self) == str(other) and self.zip_file is other.zip_file + + def __str__(self): + if not self._segments: + return '' + return self.parser.join(*self._segments) + + def __repr__(self): + return f'{type(self).__name__}({str(self)!r}, zip_file={self.zip_file!r})' + + def with_segments(self, *pathsegments): + return type(self)(*pathsegments, zip_file=self.zip_file) + + @property + def info(self): + tree = self.zip_file.filelist.tree + return tree.resolve(str(self), follow_symlinks=False) + + def __open_rb__(self, buffering=-1): + info = self.info.resolve() + if not info.exists(): + raise FileNotFoundError(errno.ENOENT, "File not found", self) + elif info.is_dir(): + raise IsADirectoryError(errno.EISDIR, "Is a directory", self) + return self.zip_file.open(info.zip_info, 'r') + + def iterdir(self): + info = self.info.resolve() + if not info.exists(): + raise FileNotFoundError(errno.ENOENT, "File not found", self) + elif not info.is_dir(): + raise NotADirectoryError(errno.ENOTDIR, "Not a directory", self) + return (self / name for name in info.children) + + def readlink(self): + info = self.info + if not info.exists(): + raise FileNotFoundError(errno.ENOENT, "File not found", self) + elif not info.is_symlink(): + raise OSError(errno.EINVAL, "Not a symlink", self) + return self.with_segments(self.zip_file.read(info.zip_info).decode()) diff --git a/Lib/test/test_pathlib/test_pathlib.py b/Lib/test/test_pathlib/test_pathlib.py index aff155cf1100bb..b9725af0c988f7 100644 --- a/Lib/test/test_pathlib/test_pathlib.py +++ b/Lib/test/test_pathlib/test_pathlib.py @@ -2429,6 +2429,33 @@ def test_symlink_to_unsupported(self): with self.assertRaises(pathlib.UnsupportedOperation): q.symlink_to(p) + def test_info_exists_caching(self): + p = self.cls(self.base) + q = p / 'myfile' + self.assertFalse(q.info.exists()) + self.assertFalse(q.info.exists(follow_symlinks=False)) + q.write_text('hullo') + self.assertFalse(q.info.exists()) + self.assertFalse(q.info.exists(follow_symlinks=False)) + + def test_info_is_dir_caching(self): + p = self.cls(self.base) + q = p / 'mydir' + self.assertFalse(q.info.is_dir()) + self.assertFalse(q.info.is_dir(follow_symlinks=False)) + q.mkdir() + self.assertFalse(q.info.is_dir()) + self.assertFalse(q.info.is_dir(follow_symlinks=False)) + + def test_info_is_file_caching(self): + p = self.cls(self.base) + q = p / 'myfile' + self.assertFalse(q.info.is_file()) + self.assertFalse(q.info.is_file(follow_symlinks=False)) + q.write_text('hullo') + self.assertFalse(q.info.is_file()) + self.assertFalse(q.info.is_file(follow_symlinks=False)) + @needs_symlinks def test_info_is_symlink_caching(self): p = self.cls(self.base) diff --git a/Lib/test/test_pathlib/test_pathlib_abc.py b/Lib/test/test_pathlib/test_pathlib_abc.py index a93af961b503a1..02e2a1da7ee7d2 100644 --- a/Lib/test/test_pathlib/test_pathlib_abc.py +++ b/Lib/test/test_pathlib/test_pathlib_abc.py @@ -314,76 +314,6 @@ def assertEqualNormCase(self, path_a, path_b): normcase = self.parser.normcase self.assertEqual(normcase(path_a), normcase(path_b)) - def test_is_readable(self): - p = self.cls(self.base) - self.assertIsInstance(p, _ReadablePath) - - def test_magic_open(self): - p = self.cls(self.base) - with magic_open(p / 'fileA', 'r') as f: - self.assertIsInstance(f, io.TextIOBase) - self.assertEqual(f.read(), "this is file A\n") - with magic_open(p / 'fileA', 'rb') as f: - self.assertIsInstance(f, io.BufferedIOBase) - self.assertEqual(f.read().strip(), b"this is file A") - - def test_iterdir(self): - P = self.cls - p = P(self.base) - it = p.iterdir() - paths = set(it) - expected = ['dirA', 'dirB', 'dirC', 'dirE', 'fileA'] - if self.can_symlink: - expected += ['linkA', 'linkB', 'brokenLink', 'brokenLinkLoop'] - self.assertEqual(paths, { P(self.base, q) for q in expected }) - - def test_iterdir_nodir(self): - # __iter__ on something that is not a directory. - p = self.cls(self.base, 'fileA') - with self.assertRaises(OSError) as cm: - p.iterdir() - # ENOENT or EINVAL under Windows, ENOTDIR otherwise - # (see issue #12802). - self.assertIn(cm.exception.errno, (errno.ENOTDIR, - errno.ENOENT, errno.EINVAL)) - - def test_iterdir_info(self): - p = self.cls(self.base) - for child in p.iterdir(): - self.assertIsInstance(child.info, PathInfo) - self.assertTrue(child.info.exists(follow_symlinks=False)) - - def test_glob_common(self): - def _check(glob, expected): - self.assertEqual(set(glob), { P(self.base, q) for q in expected }) - P = self.cls - p = P(self.base) - it = p.glob("fileA") - self.assertIsInstance(it, collections.abc.Iterator) - _check(it, ["fileA"]) - _check(p.glob("fileB"), []) - _check(p.glob("dir*/file*"), ["dirB/fileB", "dirC/fileC"]) - if not self.can_symlink: - _check(p.glob("*A"), ['dirA', 'fileA']) - else: - _check(p.glob("*A"), ['dirA', 'fileA', 'linkA']) - if not self.can_symlink: - _check(p.glob("*B/*"), ['dirB/fileB']) - else: - _check(p.glob("*B/*"), ['dirB/fileB', 'dirB/linkD', - 'linkB/fileB', 'linkB/linkD']) - if not self.can_symlink: - _check(p.glob("*/fileB"), ['dirB/fileB']) - else: - _check(p.glob("*/fileB"), ['dirB/fileB', 'linkB/fileB']) - if self.can_symlink: - _check(p.glob("brokenLink"), ['brokenLink']) - - if not self.can_symlink: - _check(p.glob("*/"), ["dirA/", "dirB/", "dirC/", "dirE/"]) - else: - _check(p.glob("*/"), ["dirA/", "dirB/", "dirC/", "dirE/", "linkB/"]) - @needs_posix def test_glob_posix(self): P = self.cls @@ -402,123 +332,6 @@ def test_glob_windows(self): self.assertEqual(set(p.glob("*a\\")), { P(self.base, "dirA/") }) self.assertEqual(set(p.glob("F*a")), { P(self.base, "fileA") }) - def test_glob_empty_pattern(self): - P = self.cls - p = P(self.base) - self.assertEqual(list(p.glob("")), [p.joinpath("")]) - - def test_info_exists(self): - p = self.cls(self.base) - self.assertTrue(p.info.exists()) - self.assertTrue((p / 'dirA').info.exists()) - self.assertTrue((p / 'dirA').info.exists(follow_symlinks=False)) - self.assertTrue((p / 'fileA').info.exists()) - self.assertTrue((p / 'fileA').info.exists(follow_symlinks=False)) - self.assertFalse((p / 'non-existing').info.exists()) - self.assertFalse((p / 'non-existing').info.exists(follow_symlinks=False)) - if self.can_symlink: - self.assertTrue((p / 'linkA').info.exists()) - self.assertTrue((p / 'linkA').info.exists(follow_symlinks=False)) - self.assertTrue((p / 'linkB').info.exists()) - self.assertTrue((p / 'linkB').info.exists(follow_symlinks=True)) - self.assertFalse((p / 'brokenLink').info.exists()) - self.assertTrue((p / 'brokenLink').info.exists(follow_symlinks=False)) - self.assertFalse((p / 'brokenLinkLoop').info.exists()) - self.assertTrue((p / 'brokenLinkLoop').info.exists(follow_symlinks=False)) - self.assertFalse((p / 'fileA\udfff').info.exists()) - self.assertFalse((p / 'fileA\udfff').info.exists(follow_symlinks=False)) - self.assertFalse((p / 'fileA\x00').info.exists()) - self.assertFalse((p / 'fileA\x00').info.exists(follow_symlinks=False)) - - def test_info_exists_caching(self): - p = self.cls(self.base) - q = p / 'myfile' - self.assertFalse(q.info.exists()) - self.assertFalse(q.info.exists(follow_symlinks=False)) - if isinstance(self.cls, _WritablePath): - q.write_text('hullo') - self.assertFalse(q.info.exists()) - self.assertFalse(q.info.exists(follow_symlinks=False)) - - def test_info_is_dir(self): - p = self.cls(self.base) - self.assertTrue((p / 'dirA').info.is_dir()) - self.assertTrue((p / 'dirA').info.is_dir(follow_symlinks=False)) - self.assertFalse((p / 'fileA').info.is_dir()) - self.assertFalse((p / 'fileA').info.is_dir(follow_symlinks=False)) - self.assertFalse((p / 'non-existing').info.is_dir()) - self.assertFalse((p / 'non-existing').info.is_dir(follow_symlinks=False)) - if self.can_symlink: - self.assertFalse((p / 'linkA').info.is_dir()) - self.assertFalse((p / 'linkA').info.is_dir(follow_symlinks=False)) - self.assertTrue((p / 'linkB').info.is_dir()) - self.assertFalse((p / 'linkB').info.is_dir(follow_symlinks=False)) - self.assertFalse((p / 'brokenLink').info.is_dir()) - self.assertFalse((p / 'brokenLink').info.is_dir(follow_symlinks=False)) - self.assertFalse((p / 'brokenLinkLoop').info.is_dir()) - self.assertFalse((p / 'brokenLinkLoop').info.is_dir(follow_symlinks=False)) - self.assertFalse((p / 'dirA\udfff').info.is_dir()) - self.assertFalse((p / 'dirA\udfff').info.is_dir(follow_symlinks=False)) - self.assertFalse((p / 'dirA\x00').info.is_dir()) - self.assertFalse((p / 'dirA\x00').info.is_dir(follow_symlinks=False)) - - def test_info_is_dir_caching(self): - p = self.cls(self.base) - q = p / 'mydir' - self.assertFalse(q.info.is_dir()) - self.assertFalse(q.info.is_dir(follow_symlinks=False)) - if isinstance(self.cls, _WritablePath): - q.mkdir() - self.assertFalse(q.info.is_dir()) - self.assertFalse(q.info.is_dir(follow_symlinks=False)) - - def test_info_is_file(self): - p = self.cls(self.base) - self.assertTrue((p / 'fileA').info.is_file()) - self.assertTrue((p / 'fileA').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'dirA').info.is_file()) - self.assertFalse((p / 'dirA').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'non-existing').info.is_file()) - self.assertFalse((p / 'non-existing').info.is_file(follow_symlinks=False)) - if self.can_symlink: - self.assertTrue((p / 'linkA').info.is_file()) - self.assertFalse((p / 'linkA').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'linkB').info.is_file()) - self.assertFalse((p / 'linkB').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'brokenLink').info.is_file()) - self.assertFalse((p / 'brokenLink').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'brokenLinkLoop').info.is_file()) - self.assertFalse((p / 'brokenLinkLoop').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'fileA\udfff').info.is_file()) - self.assertFalse((p / 'fileA\udfff').info.is_file(follow_symlinks=False)) - self.assertFalse((p / 'fileA\x00').info.is_file()) - self.assertFalse((p / 'fileA\x00').info.is_file(follow_symlinks=False)) - - def test_info_is_file_caching(self): - p = self.cls(self.base) - q = p / 'myfile' - self.assertFalse(q.info.is_file()) - self.assertFalse(q.info.is_file(follow_symlinks=False)) - if isinstance(self.cls, _WritablePath): - q.write_text('hullo') - self.assertFalse(q.info.is_file()) - self.assertFalse(q.info.is_file(follow_symlinks=False)) - - def test_info_is_symlink(self): - p = self.cls(self.base) - self.assertFalse((p / 'fileA').info.is_symlink()) - self.assertFalse((p / 'dirA').info.is_symlink()) - self.assertFalse((p / 'non-existing').info.is_symlink()) - if self.can_symlink: - self.assertTrue((p / 'linkA').info.is_symlink()) - self.assertTrue((p / 'linkB').info.is_symlink()) - self.assertTrue((p / 'brokenLink').info.is_symlink()) - self.assertFalse((p / 'linkA\udfff').info.is_symlink()) - self.assertFalse((p / 'linkA\x00').info.is_symlink()) - self.assertTrue((p / 'brokenLinkLoop').info.is_symlink()) - self.assertFalse((p / 'fileA\udfff').info.is_symlink()) - self.assertFalse((p / 'fileA\x00').info.is_symlink()) - class WritablePathTest(JoinablePathTest): cls = DummyWritablePath @@ -553,21 +366,6 @@ def test_read_write_text(self): self.assertRaises(TypeError, (p / 'fileA').write_text, b'somebytes') self.assertEqual((p / 'fileA').read_text(encoding='latin-1'), 'äbcdefg') - def test_read_text_with_newlines(self): - p = self.cls(self.base) - # Check that `\n` character change nothing - (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq') - self.assertEqual((p / 'fileA').read_text(newline='\n'), - 'abcde\r\nfghlk\n\rmnopq') - # Check that `\r` character replaces `\n` - (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq') - self.assertEqual((p / 'fileA').read_text(newline='\r'), - 'abcde\r\nfghlk\n\rmnopq') - # Check that `\r\n` character replaces `\n` - (p / 'fileA').write_bytes(b'abcde\r\nfghlk\n\rmnopq') - self.assertEqual((p / 'fileA').read_text(newline='\r\n'), - 'abcde\r\nfghlk\n\rmnopq') - def test_write_text_with_newlines(self): p = self.cls(self.base) # Check that `\n` character change nothing @@ -763,72 +561,6 @@ def tearDown(self): cls._files.clear() cls._directories.clear() - def test_walk_topdown(self): - walker = self.walk_path.walk() - entry = next(walker) - entry[1].sort() # Ensure we visit SUB1 before SUB2 - self.assertEqual(entry, (self.walk_path, ["SUB1", "SUB2"], ["tmp1"])) - entry = next(walker) - self.assertEqual(entry, (self.sub1_path, ["SUB11"], ["tmp2"])) - entry = next(walker) - self.assertEqual(entry, (self.sub11_path, [], [])) - entry = next(walker) - entry[1].sort() - entry[2].sort() - self.assertEqual(entry, self.sub2_tree) - with self.assertRaises(StopIteration): - next(walker) - - def test_walk_prune(self): - # Prune the search. - all = [] - for root, dirs, files in self.walk_path.walk(): - all.append((root, dirs, files)) - if 'SUB1' in dirs: - # Note that this also mutates the dirs we appended to all! - dirs.remove('SUB1') - - self.assertEqual(len(all), 2) - self.assertEqual(all[0], (self.walk_path, ["SUB2"], ["tmp1"])) - - all[1][-1].sort() - all[1][1].sort() - self.assertEqual(all[1], self.sub2_tree) - - def test_walk_bottom_up(self): - seen_testfn = seen_sub1 = seen_sub11 = seen_sub2 = False - for path, dirnames, filenames in self.walk_path.walk(top_down=False): - if path == self.walk_path: - self.assertFalse(seen_testfn) - self.assertTrue(seen_sub1) - self.assertTrue(seen_sub2) - self.assertEqual(sorted(dirnames), ["SUB1", "SUB2"]) - self.assertEqual(filenames, ["tmp1"]) - seen_testfn = True - elif path == self.sub1_path: - self.assertFalse(seen_testfn) - self.assertFalse(seen_sub1) - self.assertTrue(seen_sub11) - self.assertEqual(dirnames, ["SUB11"]) - self.assertEqual(filenames, ["tmp2"]) - seen_sub1 = True - elif path == self.sub11_path: - self.assertFalse(seen_sub1) - self.assertFalse(seen_sub11) - self.assertEqual(dirnames, []) - self.assertEqual(filenames, []) - seen_sub11 = True - elif path == self.sub2_path: - self.assertFalse(seen_testfn) - self.assertFalse(seen_sub2) - self.assertEqual(sorted(dirnames), sorted(self.sub2_tree[1])) - self.assertEqual(sorted(filenames), sorted(self.sub2_tree[2])) - seen_sub2 = True - else: - raise AssertionError(f"Unexpected path: {path}") - self.assertTrue(seen_testfn) - - if __name__ == "__main__": unittest.main() diff --git a/Lib/test/test_pathlib/test_read.py b/Lib/test/test_pathlib/test_read.py new file mode 100644 index 00000000000000..49015dac3b3998 --- /dev/null +++ b/Lib/test/test_pathlib/test_read.py @@ -0,0 +1,309 @@ +""" +Tests for pathlib.types._ReadablePath +""" + +import collections.abc +import io +import unittest + +from pathlib import Path +from pathlib.types import PathInfo, _ReadablePath +from pathlib._os import magic_open + +from test.test_pathlib.support.local_path import ReadableLocalPath, LocalPathGround +from test.test_pathlib.support.zip_path import ReadableZipPath, ZipPathGround + + +class ReadTestBase: + def setUp(self): + self.root = self.ground.setup() + self.ground.create_hierarchy(self.root) + + def tearDown(self): + self.ground.teardown(self.root) + + def test_is_readable(self): + self.assertIsInstance(self.root, _ReadablePath) + + def test_open_r(self): + p = self.root / 'fileA' + with magic_open(p, 'r') as f: + self.assertIsInstance(f, io.TextIOBase) + self.assertEqual(f.read(), 'this is file A\n') + + def test_open_rb(self): + p = self.root / 'fileA' + with magic_open(p, 'rb') as f: + self.assertEqual(f.read(), b'this is file A\n') + + def test_read_bytes(self): + p = self.root / 'fileA' + self.assertEqual(p.read_bytes(), b'this is file A\n') + + def test_read_text(self): + p = self.root / 'fileA' + self.assertEqual(p.read_text(), 'this is file A\n') + q = self.root / 'abc' + self.ground.create_file(q, b'\xe4bcdefg') + self.assertEqual(q.read_text(encoding='latin-1'), 'äbcdefg') + self.assertEqual(q.read_text(encoding='utf-8', errors='ignore'), 'bcdefg') + + def test_read_text_with_newlines(self): + p = self.root / 'abc' + self.ground.create_file(p, b'abcde\r\nfghlk\n\rmnopq') + # Check that `\n` character change nothing + self.assertEqual(p.read_text(newline='\n'), 'abcde\r\nfghlk\n\rmnopq') + # Check that `\r` character replaces `\n` + self.assertEqual(p.read_text(newline='\r'), 'abcde\r\nfghlk\n\rmnopq') + # Check that `\r\n` character replaces `\n` + self.assertEqual(p.read_text(newline='\r\n'), 'abcde\r\nfghlk\n\rmnopq') + + def test_iterdir(self): + expected = ['dirA', 'dirB', 'dirC', 'fileA'] + if self.ground.can_symlink: + expected += ['linkA', 'linkB', 'brokenLink', 'brokenLinkLoop'] + expected = {self.root.joinpath(name) for name in expected} + actual = set(self.root.iterdir()) + self.assertEqual(actual, expected) + + def test_iterdir_nodir(self): + p = self.root / 'fileA' + self.assertRaises(OSError, p.iterdir) + + def test_iterdir_info(self): + for child in self.root.iterdir(): + self.assertIsInstance(child.info, PathInfo) + self.assertTrue(child.info.exists(follow_symlinks=False)) + + def test_glob(self): + if not self.ground.can_symlink: + self.skipTest("requires symlinks") + + p = self.root + sep = self.root.parser.sep + altsep = self.root.parser.altsep + def check(pattern, expected): + if altsep: + expected = {name.replace(altsep, sep) for name in expected} + expected = {p.joinpath(name) for name in expected} + actual = set(p.glob(pattern, recurse_symlinks=True)) + self.assertEqual(actual, expected) + + it = p.glob("fileA") + self.assertIsInstance(it, collections.abc.Iterator) + self.assertEqual(list(it), [p.joinpath("fileA")]) + check("*A", ["dirA", "fileA", "linkA"]) + check("*A", ['dirA', 'fileA', 'linkA']) + check("*B/*", ["dirB/fileB", "linkB/fileB"]) + check("*B/*", ['dirB/fileB', 'linkB/fileB']) + check("brokenLink", ['brokenLink']) + check("brokenLinkLoop", ['brokenLinkLoop']) + check("**/", ["", "dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/", "linkB/"]) + check("**/*/", ["dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/", "linkB/"]) + check("*/", ["dirA/", "dirB/", "dirC/", "linkB/"]) + check("*/dirD/**/", ["dirC/dirD/"]) + check("*/dirD/**", ["dirC/dirD/", "dirC/dirD/fileD"]) + check("dir*/**", ["dirA/", "dirA/linkC", "dirA/linkC/fileB", "dirB/", "dirB/fileB", "dirC/", + "dirC/fileC", "dirC/dirD", "dirC/dirD/fileD", "dirC/novel.txt"]) + check("dir*/**/", ["dirA/", "dirA/linkC/", "dirB/", "dirC/", "dirC/dirD/"]) + check("dir*/**/..", ["dirA/..", "dirA/linkC/..", "dirB/..", "dirC/..", "dirC/dirD/.."]) + check("dir*/*/**", ["dirA/linkC/", "dirA/linkC/fileB", "dirC/dirD/", "dirC/dirD/fileD"]) + check("dir*/*/**/", ["dirA/linkC/", "dirC/dirD/"]) + check("dir*/*/**/..", ["dirA/linkC/..", "dirC/dirD/.."]) + check("dir*/*/..", ["dirC/dirD/..", "dirA/linkC/.."]) + check("dir*/*/../dirD/**/", ["dirC/dirD/../dirD/"]) + check("dir*/**/fileC", ["dirC/fileC"]) + check("dir*/file*", ["dirB/fileB", "dirC/fileC"]) + check("**/*/fileA", []) + check("fileB", []) + check("**/*/fileB", ["dirB/fileB", "dirA/linkC/fileB", "linkB/fileB"]) + check("**/fileB", ["dirB/fileB", "dirA/linkC/fileB", "linkB/fileB"]) + check("*/fileB", ["dirB/fileB", "linkB/fileB"]) + check("*/fileB", ['dirB/fileB', 'linkB/fileB']) + check("**/file*", + ["fileA", "dirA/linkC/fileB", "dirB/fileB", "dirC/fileC", "dirC/dirD/fileD", + "linkB/fileB"]) + + def test_walk_top_down(self): + it = self.root.walk() + + path, dirnames, filenames = next(it) + dirnames.sort() + filenames.sort() + self.assertEqual(path, self.root) + self.assertEqual(dirnames, ['dirA', 'dirB', 'dirC']) + self.assertEqual(filenames, ['brokenLink', 'brokenLinkLoop', 'fileA', 'linkA', 'linkB'] + if self.ground.can_symlink else ['fileA']) + + path, dirnames, filenames = next(it) + self.assertEqual(path, self.root / 'dirA') + self.assertEqual(dirnames, []) + self.assertEqual(filenames, ['linkC'] if self.ground.can_symlink else []) + + path, dirnames, filenames = next(it) + self.assertEqual(path, self.root / 'dirB') + self.assertEqual(dirnames, []) + self.assertEqual(filenames, ['fileB']) + + path, dirnames, filenames = next(it) + filenames.sort() + self.assertEqual(path, self.root / 'dirC') + self.assertEqual(dirnames, ['dirD']) + self.assertEqual(filenames, ['fileC', 'novel.txt']) + + path, dirnames, filenames = next(it) + self.assertEqual(path, self.root / 'dirC' / 'dirD') + self.assertEqual(dirnames, []) + self.assertEqual(filenames, ['fileD']) + + self.assertRaises(StopIteration, next, it) + + def test_walk_prune(self): + expected = {self.root, self.root / 'dirA', self.root / 'dirC', self.root / 'dirC' / 'dirD'} + actual = set() + for path, dirnames, filenames in self.root.walk(): + actual.add(path) + if path == self.root: + dirnames.remove('dirB') + self.assertEqual(actual, expected) + + def test_walk_bottom_up(self): + seen_root = seen_dira = seen_dirb = seen_dirc = seen_dird = False + for path, dirnames, filenames in self.root.walk(top_down=False): + if path == self.root: + self.assertFalse(seen_root) + self.assertTrue(seen_dira) + self.assertTrue(seen_dirb) + self.assertTrue(seen_dirc) + self.assertEqual(sorted(dirnames), ['dirA', 'dirB', 'dirC']) + self.assertEqual(sorted(filenames), + ['brokenLink', 'brokenLinkLoop', 'fileA', 'linkA', 'linkB'] + if self.ground.can_symlink else ['fileA']) + seen_root = True + elif path == self.root / 'dirA': + self.assertFalse(seen_root) + self.assertFalse(seen_dira) + self.assertEqual(dirnames, []) + self.assertEqual(filenames, ['linkC'] if self.ground.can_symlink else []) + seen_dira = True + elif path == self.root / 'dirB': + self.assertFalse(seen_root) + self.assertFalse(seen_dirb) + self.assertEqual(dirnames, []) + self.assertEqual(filenames, ['fileB']) + seen_dirb = True + elif path == self.root / 'dirC': + self.assertFalse(seen_root) + self.assertFalse(seen_dirc) + self.assertTrue(seen_dird) + self.assertEqual(dirnames, ['dirD']) + self.assertEqual(sorted(filenames), ['fileC', 'novel.txt']) + seen_dirc = True + elif path == self.root / 'dirC' / 'dirD': + self.assertFalse(seen_root) + self.assertFalse(seen_dirc) + self.assertFalse(seen_dird) + self.assertEqual(dirnames, []) + self.assertEqual(filenames, ['fileD']) + seen_dird = True + else: + raise AssertionError(f"Unexpected path: {path}") + self.assertTrue(seen_root) + + def test_info_exists(self): + p = self.root + self.assertTrue(p.info.exists()) + self.assertTrue((p / 'dirA').info.exists()) + self.assertTrue((p / 'dirA').info.exists(follow_symlinks=False)) + self.assertTrue((p / 'fileA').info.exists()) + self.assertTrue((p / 'fileA').info.exists(follow_symlinks=False)) + self.assertFalse((p / 'non-existing').info.exists()) + self.assertFalse((p / 'non-existing').info.exists(follow_symlinks=False)) + if self.ground.can_symlink: + self.assertTrue((p / 'linkA').info.exists()) + self.assertTrue((p / 'linkA').info.exists(follow_symlinks=False)) + self.assertTrue((p / 'linkB').info.exists()) + self.assertTrue((p / 'linkB').info.exists(follow_symlinks=True)) + self.assertFalse((p / 'brokenLink').info.exists()) + self.assertTrue((p / 'brokenLink').info.exists(follow_symlinks=False)) + self.assertFalse((p / 'brokenLinkLoop').info.exists()) + self.assertTrue((p / 'brokenLinkLoop').info.exists(follow_symlinks=False)) + self.assertFalse((p / 'fileA\udfff').info.exists()) + self.assertFalse((p / 'fileA\udfff').info.exists(follow_symlinks=False)) + self.assertFalse((p / 'fileA\x00').info.exists()) + self.assertFalse((p / 'fileA\x00').info.exists(follow_symlinks=False)) + + def test_info_is_dir(self): + p = self.root + self.assertTrue((p / 'dirA').info.is_dir()) + self.assertTrue((p / 'dirA').info.is_dir(follow_symlinks=False)) + self.assertFalse((p / 'fileA').info.is_dir()) + self.assertFalse((p / 'fileA').info.is_dir(follow_symlinks=False)) + self.assertFalse((p / 'non-existing').info.is_dir()) + self.assertFalse((p / 'non-existing').info.is_dir(follow_symlinks=False)) + if self.ground.can_symlink: + self.assertFalse((p / 'linkA').info.is_dir()) + self.assertFalse((p / 'linkA').info.is_dir(follow_symlinks=False)) + self.assertTrue((p / 'linkB').info.is_dir()) + self.assertFalse((p / 'linkB').info.is_dir(follow_symlinks=False)) + self.assertFalse((p / 'brokenLink').info.is_dir()) + self.assertFalse((p / 'brokenLink').info.is_dir(follow_symlinks=False)) + self.assertFalse((p / 'brokenLinkLoop').info.is_dir()) + self.assertFalse((p / 'brokenLinkLoop').info.is_dir(follow_symlinks=False)) + self.assertFalse((p / 'dirA\udfff').info.is_dir()) + self.assertFalse((p / 'dirA\udfff').info.is_dir(follow_symlinks=False)) + self.assertFalse((p / 'dirA\x00').info.is_dir()) + self.assertFalse((p / 'dirA\x00').info.is_dir(follow_symlinks=False)) + + def test_info_is_file(self): + p = self.root + self.assertTrue((p / 'fileA').info.is_file()) + self.assertTrue((p / 'fileA').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'dirA').info.is_file()) + self.assertFalse((p / 'dirA').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'non-existing').info.is_file()) + self.assertFalse((p / 'non-existing').info.is_file(follow_symlinks=False)) + if self.ground.can_symlink: + self.assertTrue((p / 'linkA').info.is_file()) + self.assertFalse((p / 'linkA').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'linkB').info.is_file()) + self.assertFalse((p / 'linkB').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'brokenLink').info.is_file()) + self.assertFalse((p / 'brokenLink').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'brokenLinkLoop').info.is_file()) + self.assertFalse((p / 'brokenLinkLoop').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'fileA\udfff').info.is_file()) + self.assertFalse((p / 'fileA\udfff').info.is_file(follow_symlinks=False)) + self.assertFalse((p / 'fileA\x00').info.is_file()) + self.assertFalse((p / 'fileA\x00').info.is_file(follow_symlinks=False)) + + def test_info_is_symlink(self): + p = self.root + self.assertFalse((p / 'fileA').info.is_symlink()) + self.assertFalse((p / 'dirA').info.is_symlink()) + self.assertFalse((p / 'non-existing').info.is_symlink()) + if self.ground.can_symlink: + self.assertTrue((p / 'linkA').info.is_symlink()) + self.assertTrue((p / 'linkB').info.is_symlink()) + self.assertTrue((p / 'brokenLink').info.is_symlink()) + self.assertFalse((p / 'linkA\udfff').info.is_symlink()) + self.assertFalse((p / 'linkA\x00').info.is_symlink()) + self.assertTrue((p / 'brokenLinkLoop').info.is_symlink()) + self.assertFalse((p / 'fileA\udfff').info.is_symlink()) + self.assertFalse((p / 'fileA\x00').info.is_symlink()) + + +class ZipPathReadTest(ReadTestBase, unittest.TestCase): + ground = ZipPathGround(ReadableZipPath) + + +class LocalPathReadTest(ReadTestBase, unittest.TestCase): + ground = LocalPathGround(ReadableLocalPath) + + +class PathReadTest(ReadTestBase, unittest.TestCase): + ground = LocalPathGround(Path) + + +if __name__ == "__main__": + unittest.main() _______________________________________________ Python-checkins mailing list -- python-checkins@python.org To unsubscribe send an email to python-checkins-le...@python.org https://mail.python.org/mailman3/lists/python-checkins.python.org/ Member address: arch...@mail-archive.com