https://github.com/python/cpython/commit/805e3368d6d07e58430654d1365283924fdf4143 commit: 805e3368d6d07e58430654d1365283924fdf4143 branch: main author: Barney Gale <[email protected]> committer: barneygale <[email protected]> date: 2025-09-12T22:25:18+01:00 summary:
GH-128520: pathlib ABCs: improve protocol for 'openable' objects (#134101) Rename `pathlib._os.magic_open()` to `vfsopen()`. The new name is a bit less abstract, and it aligns with the `vfspath()` method added in 5dbd27d. Per discussion on discourse[^1], adjust `vfsopen()` so that the following methods may be called: - `__open_reader__()` - `__open_writer__(mode)` - `__open_updater__(mode)` These three methods return readable, writable, and full duplex file objects respectively. In the 'writer' method, *mode* is either 'a', 'w' or 'x'. In the 'updater' method, *mode* is either 'r' or 'w'. In the pathlib ABCs, replace `ReadablePath.__open_rb__()` with `__open_reader__()`, and replace `WritablePath.__open_wb__()` with `__open_writer__()`. [^1]: https://discuss.python.org/t/open-able-objects/90238 Co-authored-by: Petr Viktorin <[email protected]> files: M Lib/pathlib/__init__.py M Lib/pathlib/_os.py M Lib/pathlib/types.py M Lib/test/test_pathlib/support/local_path.py M Lib/test/test_pathlib/support/zip_path.py M Lib/test/test_pathlib/test_read.py M Lib/test/test_pathlib/test_write.py diff --git a/Lib/pathlib/__init__.py b/Lib/pathlib/__init__.py index cea1a9fe57eedf..bc39a30c6538ce 100644 --- a/Lib/pathlib/__init__.py +++ b/Lib/pathlib/__init__.py @@ -28,7 +28,7 @@ from pathlib._os import ( PathInfo, DirEntryInfo, - magic_open, vfspath, + vfsopen, vfspath, ensure_different_files, ensure_distinct_paths, copyfile2, copyfileobj, copy_info, ) @@ -1129,7 +1129,7 @@ def _copy_from(self, source, follow_symlinks=True, preserve_metadata=False): def _copy_from_file(self, source, preserve_metadata=False): ensure_different_files(source, self) - with magic_open(source, 'rb') as source_f: + with vfsopen(source, 'rb') as source_f: with open(self, 'wb') as target_f: copyfileobj(source_f, target_f) if preserve_metadata: diff --git a/Lib/pathlib/_os.py b/Lib/pathlib/_os.py index fbcbfb979d1278..6508a9bca0d72b 100644 --- a/Lib/pathlib/_os.py +++ b/Lib/pathlib/_os.py @@ -166,48 +166,86 @@ def copyfileobj(source_f, target_f): write_target(buf) -def magic_open(path, mode='r', buffering=-1, encoding=None, errors=None, - newline=None): +def _open_reader(obj): + cls = type(obj) + try: + open_reader = cls.__open_reader__ + except AttributeError: + cls_name = cls.__name__ + raise TypeError(f"{cls_name} can't be opened for reading") from None + else: + return open_reader(obj) + + +def _open_writer(obj, mode): + cls = type(obj) + try: + open_writer = cls.__open_writer__ + except AttributeError: + cls_name = cls.__name__ + raise TypeError(f"{cls_name} can't be opened for writing") from None + else: + return open_writer(obj, mode) + + +def _open_updater(obj, mode): + cls = type(obj) + try: + open_updater = cls.__open_updater__ + except AttributeError: + cls_name = cls.__name__ + raise TypeError(f"{cls_name} can't be opened for updating") from None + else: + return open_updater(obj, mode) + + +def vfsopen(obj, mode='r', buffering=-1, encoding=None, errors=None, + newline=None): """ Open the file pointed to by this path and return a file object, as the built-in open() function does. + + Unlike the built-in open() function, this function additionally accepts + 'openable' objects, which are objects with any of these special methods: + + __open_reader__() + __open_writer__(mode) + __open_updater__(mode) + + '__open_reader__' is called for 'r' mode; '__open_writer__' for 'a', 'w' + and 'x' modes; and '__open_updater__' for 'r+' and 'w+' modes. If text + mode is requested, the result is wrapped in an io.TextIOWrapper object. """ + if buffering != -1: + raise ValueError("buffer size can't be customized") text = 'b' not in mode if text: # Call io.text_encoding() here to ensure any warning is raised at an # appropriate stack level. encoding = text_encoding(encoding) try: - return open(path, mode, buffering, encoding, errors, newline) + return open(obj, mode, buffering, encoding, errors, newline) except TypeError: pass - cls = type(path) + if not text: + if encoding is not None: + raise ValueError("binary mode doesn't take an encoding argument") + if errors is not None: + raise ValueError("binary mode doesn't take an errors argument") + if newline is not None: + raise ValueError("binary mode doesn't take a newline argument") mode = ''.join(sorted(c for c in mode if c not in 'bt')) - if text: - try: - attr = getattr(cls, f'__open_{mode}__') - except AttributeError: - pass - else: - return attr(path, buffering, encoding, errors, newline) - elif encoding is not None: - raise ValueError("binary mode doesn't take an encoding argument") - elif errors is not None: - raise ValueError("binary mode doesn't take an errors argument") - elif newline is not None: - raise ValueError("binary mode doesn't take a newline argument") - - try: - attr = getattr(cls, f'__open_{mode}b__') - except AttributeError: - pass + if mode == 'r': + stream = _open_reader(obj) + elif mode in ('a', 'w', 'x'): + stream = _open_writer(obj, mode) + elif mode in ('+r', '+w'): + stream = _open_updater(obj, mode[1]) else: - stream = attr(path, buffering) - if text: - stream = TextIOWrapper(stream, encoding, errors, newline) - return stream - - raise TypeError(f"{cls.__name__} can't be opened with mode {mode!r}") + raise ValueError(f'invalid mode: {mode}') + if text: + stream = TextIOWrapper(stream, encoding, errors, newline) + return stream def vfspath(obj): diff --git a/Lib/pathlib/types.py b/Lib/pathlib/types.py index 42b80221608bcc..fea0dd305fe2a3 100644 --- a/Lib/pathlib/types.py +++ b/Lib/pathlib/types.py @@ -13,7 +13,7 @@ from abc import ABC, abstractmethod from glob import _GlobberBase from io import text_encoding -from pathlib._os import (magic_open, vfspath, ensure_distinct_paths, +from pathlib._os import (vfsopen, vfspath, ensure_distinct_paths, ensure_different_files, copyfileobj) from pathlib import PurePath, Path from typing import Optional, Protocol, runtime_checkable @@ -264,10 +264,10 @@ def info(self): raise NotImplementedError @abstractmethod - def __open_rb__(self, buffering=-1): + def __open_reader__(self): """ Open the file pointed to by this path for reading in binary mode and - return a file object, like open(mode='rb'). + return a file object. """ raise NotImplementedError @@ -275,7 +275,7 @@ def read_bytes(self): """ Open the file in bytes mode, read it, and close the file. """ - with magic_open(self, mode='rb', buffering=0) as f: + with vfsopen(self, mode='rb') as f: return f.read() def read_text(self, encoding=None, errors=None, newline=None): @@ -285,7 +285,7 @@ def read_text(self, encoding=None, errors=None, newline=None): # Call io.text_encoding() here to ensure any warning is raised at an # appropriate stack level. encoding = text_encoding(encoding) - with magic_open(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f: + with vfsopen(self, mode='r', encoding=encoding, errors=errors, newline=newline) as f: return f.read() @abstractmethod @@ -394,10 +394,10 @@ def mkdir(self): raise NotImplementedError @abstractmethod - def __open_wb__(self, buffering=-1): + def __open_writer__(self, mode): """ Open the file pointed to by this path for writing in binary mode and - return a file object, like open(mode='wb'). + return a file object. """ raise NotImplementedError @@ -407,7 +407,7 @@ def write_bytes(self, data): """ # type-check for the buffer interface before truncating the file view = memoryview(data) - with magic_open(self, mode='wb') as f: + with vfsopen(self, mode='wb') as f: return f.write(view) def write_text(self, data, encoding=None, errors=None, newline=None): @@ -420,7 +420,7 @@ def write_text(self, data, encoding=None, errors=None, newline=None): if not isinstance(data, str): raise TypeError('data must be str, not %s' % data.__class__.__name__) - with magic_open(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f: + with vfsopen(self, mode='w', encoding=encoding, errors=errors, newline=newline) as f: return f.write(data) def _copy_from(self, source, follow_symlinks=True): @@ -439,8 +439,8 @@ def _copy_from(self, source, follow_symlinks=True): stack.append((child, dst.joinpath(child.name))) else: ensure_different_files(src, dst) - with magic_open(src, 'rb') as source_f: - with magic_open(dst, 'wb') as target_f: + with vfsopen(src, 'rb') as source_f: + with vfsopen(dst, 'wb') as target_f: copyfileobj(source_f, target_f) diff --git a/Lib/test/test_pathlib/support/local_path.py b/Lib/test/test_pathlib/support/local_path.py index c1423c545bfd00..ddfd6fd419533c 100644 --- a/Lib/test/test_pathlib/support/local_path.py +++ b/Lib/test/test_pathlib/support/local_path.py @@ -145,7 +145,7 @@ def __init__(self, *pathsegments): super().__init__(*pathsegments) self.info = LocalPathInfo(self) - def __open_rb__(self, buffering=-1): + def __open_reader__(self): return open(self, 'rb') def iterdir(self): @@ -163,8 +163,8 @@ class WritableLocalPath(_WritablePath, LexicalPath): __slots__ = () __fspath__ = LexicalPath.__vfspath__ - def __open_wb__(self, buffering=-1): - return open(self, 'wb') + def __open_writer__(self, mode): + return open(self, f'{mode}b') def mkdir(self, mode=0o777): os.mkdir(self, mode) diff --git a/Lib/test/test_pathlib/support/zip_path.py b/Lib/test/test_pathlib/support/zip_path.py index 2bfe89b36595b0..90b939b6a59010 100644 --- a/Lib/test/test_pathlib/support/zip_path.py +++ b/Lib/test/test_pathlib/support/zip_path.py @@ -264,13 +264,13 @@ def info(self): tree = self.zip_file.filelist.tree return tree.resolve(vfspath(self), follow_symlinks=False) - def __open_rb__(self, buffering=-1): + def __open_reader__(self): info = self.info.resolve() if not info.exists(): raise FileNotFoundError(errno.ENOENT, "File not found", self) elif info.is_dir(): raise IsADirectoryError(errno.EISDIR, "Is a directory", self) - return self.zip_file.open(info.zip_info, 'r') + return self.zip_file.open(info.zip_info) def iterdir(self): info = self.info.resolve() @@ -320,8 +320,8 @@ def __repr__(self): def with_segments(self, *pathsegments): return type(self)(*pathsegments, zip_file=self.zip_file) - def __open_wb__(self, buffering=-1): - return self.zip_file.open(vfspath(self), 'w') + def __open_writer__(self, mode): + return self.zip_file.open(vfspath(self), mode) def mkdir(self, mode=0o777): zinfo = zipfile.ZipInfo(vfspath(self) + '/') diff --git a/Lib/test/test_pathlib/test_read.py b/Lib/test/test_pathlib/test_read.py index 482203c290a3c4..16fb555b2aee05 100644 --- a/Lib/test/test_pathlib/test_read.py +++ b/Lib/test/test_pathlib/test_read.py @@ -13,10 +13,10 @@ if is_pypi: from pathlib_abc import PathInfo, _ReadablePath - from pathlib_abc._os import magic_open + from pathlib_abc._os import vfsopen else: from pathlib.types import PathInfo, _ReadablePath - from pathlib._os import magic_open + from pathlib._os import vfsopen class ReadTestBase: @@ -32,10 +32,16 @@ def test_is_readable(self): def test_open_r(self): p = self.root / 'fileA' - with magic_open(p, 'r', encoding='utf-8') as f: + with vfsopen(p, 'r', encoding='utf-8') as f: self.assertIsInstance(f, io.TextIOBase) self.assertEqual(f.read(), 'this is file A\n') + def test_open_r_buffering_error(self): + p = self.root / 'fileA' + self.assertRaises(ValueError, vfsopen, p, 'r', buffering=0) + self.assertRaises(ValueError, vfsopen, p, 'r', buffering=1) + self.assertRaises(ValueError, vfsopen, p, 'r', buffering=1024) + @unittest.skipIf( not getattr(sys.flags, 'warn_default_encoding', 0), "Requires warn_default_encoding", @@ -43,17 +49,17 @@ def test_open_r(self): def test_open_r_encoding_warning(self): p = self.root / 'fileA' with self.assertWarns(EncodingWarning) as wc: - with magic_open(p, 'r'): + with vfsopen(p, 'r'): pass self.assertEqual(wc.filename, __file__) def test_open_rb(self): p = self.root / 'fileA' - with magic_open(p, 'rb') as f: + with vfsopen(p, 'rb') as f: self.assertEqual(f.read(), b'this is file A\n') - self.assertRaises(ValueError, magic_open, p, 'rb', encoding='utf8') - self.assertRaises(ValueError, magic_open, p, 'rb', errors='strict') - self.assertRaises(ValueError, magic_open, p, 'rb', newline='') + self.assertRaises(ValueError, vfsopen, p, 'rb', encoding='utf8') + self.assertRaises(ValueError, vfsopen, p, 'rb', errors='strict') + self.assertRaises(ValueError, vfsopen, p, 'rb', newline='') def test_read_bytes(self): p = self.root / 'fileA' diff --git a/Lib/test/test_pathlib/test_write.py b/Lib/test/test_pathlib/test_write.py index b958490d0a834f..c9c1d64656c9be 100644 --- a/Lib/test/test_pathlib/test_write.py +++ b/Lib/test/test_pathlib/test_write.py @@ -13,10 +13,10 @@ if is_pypi: from pathlib_abc import _WritablePath - from pathlib_abc._os import magic_open + from pathlib_abc._os import vfsopen else: from pathlib.types import _WritablePath - from pathlib._os import magic_open + from pathlib._os import vfsopen class WriteTestBase: @@ -31,11 +31,17 @@ def test_is_writable(self): def test_open_w(self): p = self.root / 'fileA' - with magic_open(p, 'w', encoding='utf-8') as f: + with vfsopen(p, 'w', encoding='utf-8') as f: self.assertIsInstance(f, io.TextIOBase) f.write('this is file A\n') self.assertEqual(self.ground.readtext(p), 'this is file A\n') + def test_open_w_buffering_error(self): + p = self.root / 'fileA' + self.assertRaises(ValueError, vfsopen, p, 'w', buffering=0) + self.assertRaises(ValueError, vfsopen, p, 'w', buffering=1) + self.assertRaises(ValueError, vfsopen, p, 'w', buffering=1024) + @unittest.skipIf( not getattr(sys.flags, 'warn_default_encoding', 0), "Requires warn_default_encoding", @@ -43,19 +49,19 @@ def test_open_w(self): def test_open_w_encoding_warning(self): p = self.root / 'fileA' with self.assertWarns(EncodingWarning) as wc: - with magic_open(p, 'w'): + with vfsopen(p, 'w'): pass self.assertEqual(wc.filename, __file__) def test_open_wb(self): p = self.root / 'fileA' - with magic_open(p, 'wb') as f: + with vfsopen(p, 'wb') as f: #self.assertIsInstance(f, io.BufferedWriter) f.write(b'this is file A\n') self.assertEqual(self.ground.readbytes(p), b'this is file A\n') - self.assertRaises(ValueError, magic_open, p, 'wb', encoding='utf8') - self.assertRaises(ValueError, magic_open, p, 'wb', errors='strict') - self.assertRaises(ValueError, magic_open, p, 'wb', newline='') + self.assertRaises(ValueError, vfsopen, p, 'wb', encoding='utf8') + self.assertRaises(ValueError, vfsopen, p, 'wb', errors='strict') + self.assertRaises(ValueError, vfsopen, p, 'wb', newline='') def test_write_bytes(self): p = self.root / 'fileA' _______________________________________________ Python-checkins mailing list -- [email protected] To unsubscribe send an email to [email protected] https://mail.python.org/mailman3//lists/python-checkins.python.org Member address: [email protected]
