https://github.com/python/cpython/commit/8a5a18a36ee118e11c0b45c3dfe8bd69e5db82f9
commit: 8a5a18a36ee118e11c0b45c3dfe8bd69e5db82f9
branch: main
author: Victor Stinner <[email protected]>
committer: vstinner <[email protected]>
date: 2025-01-27T18:04:45+01:00
summary:

gh-129205: Modernize test_eintr (#129316)

* Use f-string.
* Fix grammar: replace 'datas' with 'data' (and replace 'data' with
  'item').
* Remove unused variables: 'pid' and 'old_mask'.
* Factorize test_read() and test_readinto() code.

Co-authored-by: Cody Maloney <[email protected]>

files:
M Lib/test/_test_eintr.py

diff --git a/Lib/test/_test_eintr.py b/Lib/test/_test_eintr.py
index ae799808ca067e..0ce42276bfe3d6 100644
--- a/Lib/test/_test_eintr.py
+++ b/Lib/test/_test_eintr.py
@@ -91,7 +91,7 @@ class OSEINTRTest(EINTRBaseTest):
     """ EINTR tests for the os module. """
 
     def new_sleep_process(self):
-        code = 'import time; time.sleep(%r)' % self.sleep_time
+        code = f'import time; time.sleep({self.sleep_time!r})'
         return self.subprocess(code)
 
     def _test_wait_multiple(self, wait_func):
@@ -123,65 +123,45 @@ def test_waitpid(self):
     def test_wait4(self):
         self._test_wait_single(lambda pid: os.wait4(pid, 0))
 
-    def test_read(self):
+    def _interrupted_reads(self):
+        """Make a fd which will force block on read of expected bytes."""
         rd, wr = os.pipe()
         self.addCleanup(os.close, rd)
         # wr closed explicitly by parent
 
         # the payload below are smaller than PIPE_BUF, hence the writes will be
         # atomic
-        datas = [b"hello", b"world", b"spam"]
+        data = [b"hello", b"world", b"spam"]
 
         code = '\n'.join((
             'import os, sys, time',
             '',
             'wr = int(sys.argv[1])',
-            'datas = %r' % datas,
-            'sleep_time = %r' % self.sleep_time,
+            f'data = {data!r}',
+            f'sleep_time = {self.sleep_time!r}',
             '',
-            'for data in datas:',
+            'for item in data:',
             '    # let the parent block on read()',
             '    time.sleep(sleep_time)',
-            '    os.write(wr, data)',
+            '    os.write(wr, item)',
         ))
 
         proc = self.subprocess(code, str(wr), pass_fds=[wr])
         with kill_on_error(proc):
             os.close(wr)
-            for data in datas:
-                self.assertEqual(data, os.read(rd, len(data)))
+            for datum in data:
+                yield rd, datum
             self.assertEqual(proc.wait(), 0)
 
-    def test_readinto(self):
-        rd, wr = os.pipe()
-        self.addCleanup(os.close, rd)
-        # wr closed explicitly by parent
-
-        # the payload below are smaller than PIPE_BUF, hence the writes will be
-        # atomic
-        datas = [b"hello", b"world", b"spam"]
-
-        code = '\n'.join((
-            'import os, sys, time',
-            '',
-            'wr = int(sys.argv[1])',
-            'datas = %r' % datas,
-            'sleep_time = %r' % self.sleep_time,
-            '',
-            'for data in datas:',
-            '    # let the parent block on read()',
-            '    time.sleep(sleep_time)',
-            '    os.write(wr, data)',
-        ))
+    def test_read(self):
+        for fd, expected in self._interrupted_reads():
+            self.assertEqual(expected, os.read(fd, len(expected)))
 
-        proc = self.subprocess(code, str(wr), pass_fds=[wr])
-        with kill_on_error(proc):
-            os.close(wr)
-            for data in datas:
-                buffer = bytearray(len(data))
-                self.assertEqual(os.readinto(rd, buffer), len(data))
-                self.assertEqual(buffer, data)
-            self.assertEqual(proc.wait(), 0)
+    def test_readinto(self):
+        for fd, expected in self._interrupted_reads():
+            buffer = bytearray(len(expected))
+            self.assertEqual(os.readinto(fd, buffer), len(expected))
+            self.assertEqual(buffer, expected)
 
     def test_write(self):
         rd, wr = os.pipe()
@@ -195,8 +175,8 @@ def test_write(self):
             'import io, os, sys, time',
             '',
             'rd = int(sys.argv[1])',
-            'sleep_time = %r' % self.sleep_time,
-            'data = b"x" * %s' % support.PIPE_MAX_SIZE,
+            f'sleep_time = {self.sleep_time!r}',
+            f'data = b"x" * {support.PIPE_MAX_SIZE}',
             'data_len = len(data)',
             '',
             '# let the parent block on write()',
@@ -209,8 +189,8 @@ def test_write(self):
             '',
             'value = read_data.getvalue()',
             'if value != data:',
-            '    raise Exception("read error: %s vs %s bytes"',
-            '                    % (len(value), data_len))',
+            '    raise Exception(f"read error: {len(value)}'
+                                  ' vs {data_len} bytes")',
         ))
 
         proc = self.subprocess(code, str(rd), pass_fds=[rd])
@@ -233,33 +213,33 @@ def _test_recv(self, recv_func):
         # wr closed explicitly by parent
 
         # single-byte payload guard us against partial recv
-        datas = [b"x", b"y", b"z"]
+        data = [b"x", b"y", b"z"]
 
         code = '\n'.join((
             'import os, socket, sys, time',
             '',
             'fd = int(sys.argv[1])',
-            'family = %s' % int(wr.family),
-            'sock_type = %s' % int(wr.type),
-            'datas = %r' % datas,
-            'sleep_time = %r' % self.sleep_time,
+            f'family = {int(wr.family)}',
+            f'sock_type = {int(wr.type)}',
+            f'data = {data!r}',
+            f'sleep_time = {self.sleep_time!r}',
             '',
             'wr = socket.fromfd(fd, family, sock_type)',
             'os.close(fd)',
             '',
             'with wr:',
-            '    for data in datas:',
+            '    for item in data:',
             '        # let the parent block on recv()',
             '        time.sleep(sleep_time)',
-            '        wr.sendall(data)',
+            '        wr.sendall(item)',
         ))
 
         fd = wr.fileno()
         proc = self.subprocess(code, str(fd), pass_fds=[fd])
         with kill_on_error(proc):
             wr.close()
-            for data in datas:
-                self.assertEqual(data, recv_func(rd, len(data)))
+            for item in data:
+                self.assertEqual(item, recv_func(rd, len(item)))
             self.assertEqual(proc.wait(), 0)
 
     def test_recv(self):
@@ -281,10 +261,10 @@ def _test_send(self, send_func):
             'import os, socket, sys, time',
             '',
             'fd = int(sys.argv[1])',
-            'family = %s' % int(rd.family),
-            'sock_type = %s' % int(rd.type),
-            'sleep_time = %r' % self.sleep_time,
-            'data = b"xyz" * %s' % (support.SOCK_MAX_SIZE // 3),
+            f'family = {int(rd.family)}',
+            f'sock_type = {int(rd.type)}',
+            f'sleep_time = {self.sleep_time!r}',
+            f'data = b"xyz" * {support.SOCK_MAX_SIZE // 3}',
             'data_len = len(data)',
             '',
             'rd = socket.fromfd(fd, family, sock_type)',
@@ -300,8 +280,8 @@ def _test_send(self, send_func):
             '        n += rd.recv_into(memoryview(received_data)[n:])',
             '',
             'if received_data != data:',
-            '    raise Exception("recv error: %s vs %s bytes"',
-            '                    % (len(received_data), data_len))',
+            '    raise Exception(f"recv error: {len(received_data)}'
+                                  ' vs {data_len} bytes")',
         ))
 
         fd = rd.fileno()
@@ -333,9 +313,9 @@ def test_accept(self):
         code = '\n'.join((
             'import socket, time',
             '',
-            'host = %r' % socket_helper.HOST,
-            'port = %s' % port,
-            'sleep_time = %r' % self.sleep_time,
+            f'host = {socket_helper.HOST!r}',
+            f'port = {port}',
+            f'sleep_time = {self.sleep_time!r}',
             '',
             '# let parent block on accept()',
             'time.sleep(sleep_time)',
@@ -363,15 +343,15 @@ def _test_open(self, do_open_close_reader, 
do_open_close_writer):
         os_helper.unlink(filename)
         try:
             os.mkfifo(filename)
-        except PermissionError as e:
-            self.skipTest('os.mkfifo(): %s' % e)
+        except PermissionError as exc:
+            self.skipTest(f'os.mkfifo(): {exc!r}')
         self.addCleanup(os_helper.unlink, filename)
 
         code = '\n'.join((
             'import os, time',
             '',
-            'path = %a' % filename,
-            'sleep_time = %r' % self.sleep_time,
+            f'path = {filename!a}',
+            f'sleep_time = {self.sleep_time!r}',
             '',
             '# let the parent block',
             'time.sleep(sleep_time)',
@@ -427,21 +407,20 @@ class SignalEINTRTest(EINTRBaseTest):
 
     def check_sigwait(self, wait_func):
         signum = signal.SIGUSR1
-        pid = os.getpid()
 
         old_handler = signal.signal(signum, lambda *args: None)
         self.addCleanup(signal.signal, signum, old_handler)
 
         code = '\n'.join((
             'import os, time',
-            'pid = %s' % os.getpid(),
-            'signum = %s' % int(signum),
-            'sleep_time = %r' % self.sleep_time,
+            f'pid = {os.getpid()}',
+            f'signum = {int(signum)}',
+            f'sleep_time = {self.sleep_time!r}',
             'time.sleep(sleep_time)',
             'os.kill(pid, signum)',
         ))
 
-        old_mask = signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+        signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
         self.addCleanup(signal.pthread_sigmask, signal.SIG_UNBLOCK, [signum])
 
         proc = self.subprocess(code)

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]

Reply via email to