https://github.com/python/cpython/commit/e5a567b0a721c26c79530249d9aa159afbd11955
commit: e5a567b0a721c26c79530249d9aa159afbd11955
branch: main
author: Serhiy Storchaka <storch...@gmail.com>
committer: serhiy-storchaka <storch...@gmail.com>
date: 2024-08-31T12:30:05+03:00
summary:

gh-123309: Add more tests for the pickletools module (GH-123355)

Add tests for genops() and dis().

files:
M Lib/test/test_pickletools.py

diff --git a/Lib/test/test_pickletools.py b/Lib/test/test_pickletools.py
index d37af79e878a2e..8cb1f6dffcc6be 100644
--- a/Lib/test/test_pickletools.py
+++ b/Lib/test/test_pickletools.py
@@ -1,3 +1,4 @@
+import io
 import pickle
 import pickletools
 from test import support
@@ -62,6 +63,315 @@ def test_optimize_binput_and_memoize(self):
         self.assertNotIn(pickle.BINPUT, pickled2)
 
 
+class SimpleReader:
+    def __init__(self, data):
+        self.data = data
+        self.pos = 0
+
+    def read(self, n):
+        data = self.data[self.pos: self.pos + n]
+        self.pos += n
+        return data
+
+    def readline(self):
+        nl = self.data.find(b'\n', self.pos) + 1
+        if not nl:
+            nl = len(self.data)
+        data = self.data[self.pos: nl]
+        self.pos = nl
+        return data
+
+
+class GenopsTests(unittest.TestCase):
+    def test_genops(self):
+        it = pickletools.genops(b'(I123\nK\x12J\x12\x34\x56\x78t.')
+        self.assertEqual([(item[0].name,) +  item[1:] for item in it], [
+            ('MARK', None, 0),
+            ('INT', 123, 1),
+            ('BININT1', 0x12, 6),
+            ('BININT', 0x78563412, 8),
+            ('TUPLE', None, 13),
+            ('STOP', None, 14),
+        ])
+
+    def test_from_file(self):
+        f = io.BytesIO(b'prefix(I123\nK\x12J\x12\x34\x56\x78t.suffix')
+        self.assertEqual(f.read(6), b'prefix')
+        it = pickletools.genops(f)
+        self.assertEqual([(item[0].name,) +  item[1:] for item in it], [
+            ('MARK', None, 6),
+            ('INT', 123, 7),
+            ('BININT1', 0x12, 12),
+            ('BININT', 0x78563412, 14),
+            ('TUPLE', None, 19),
+            ('STOP', None, 20),
+        ])
+        self.assertEqual(f.read(), b'suffix')
+
+    def test_without_pos(self):
+        f = SimpleReader(b'(I123\nK\x12J\x12\x34\x56\x78t.')
+        it = pickletools.genops(f)
+        self.assertEqual([(item[0].name,) +  item[1:] for item in it], [
+            ('MARK', None, None),
+            ('INT', 123, None),
+            ('BININT1', 0x12, None),
+            ('BININT', 0x78563412, None),
+            ('TUPLE', None, None),
+            ('STOP', None, None),
+        ])
+
+    def test_no_stop(self):
+        it = pickletools.genops(b'N')
+        item = next(it)
+        self.assertEqual(item[0].name, 'NONE')
+        with self.assertRaisesRegex(ValueError,
+                'pickle exhausted before seeing STOP'):
+            next(it)
+
+    def test_truncated_data(self):
+        it = pickletools.genops(b'I123')
+        with self.assertRaisesRegex(ValueError,
+                'no newline found when trying to read stringnl'):
+            next(it)
+        it = pickletools.genops(b'J\x12\x34')
+        with self.assertRaisesRegex(ValueError,
+                'not enough data in stream to read int4'):
+            next(it)
+
+    def test_unknown_opcode(self):
+        it = pickletools.genops(b'N\xff')
+        item = next(it)
+        self.assertEqual(item[0].name, 'NONE')
+        with self.assertRaisesRegex(ValueError,
+                r"at position 1, opcode b'\\xff' unknown"):
+            next(it)
+
+    def test_unknown_opcode_without_pos(self):
+        f = SimpleReader(b'N\xff')
+        it = pickletools.genops(f)
+        item = next(it)
+        self.assertEqual(item[0].name, 'NONE')
+        with self.assertRaisesRegex(ValueError,
+                r"at position <unknown>, opcode b'\\xff' unknown"):
+            next(it)
+
+
+class DisTests(unittest.TestCase):
+    maxDiff = None
+
+    def check_dis(self, data, expected, **kwargs):
+        out = io.StringIO()
+        pickletools.dis(data, out=out, **kwargs)
+        self.assertEqual(out.getvalue(), expected)
+
+    def check_dis_error(self, data, expected, expected_error, **kwargs):
+        out = io.StringIO()
+        with self.assertRaisesRegex(ValueError, expected_error):
+            pickletools.dis(data, out=out, **kwargs)
+        self.assertEqual(out.getvalue(), expected)
+
+    def test_mark(self):
+        self.check_dis(b'(N(tl.', '''\
+    0: (    MARK
+    1: N        NONE
+    2: (        MARK
+    3: t            TUPLE      (MARK at 2)
+    4: l        LIST       (MARK at 0)
+    5: .    STOP
+highest protocol among opcodes = 0
+''')
+
+    def test_indentlevel(self):
+        self.check_dis(b'(N(tl.', '''\
+    0: (    MARK
+    1: N      NONE
+    2: (      MARK
+    3: t        TUPLE      (MARK at 2)
+    4: l      LIST       (MARK at 0)
+    5: .    STOP
+highest protocol among opcodes = 0
+''', indentlevel=2)
+
+    def test_mark_without_pos(self):
+        self.check_dis(SimpleReader(b'(N(tl.'), '''\
+(    MARK
+N        NONE
+(        MARK
+t            TUPLE      (MARK at unknown opcode offset)
+l        LIST       (MARK at unknown opcode offset)
+.    STOP
+highest protocol among opcodes = 0
+''')
+
+    def test_no_mark(self):
+        self.check_dis_error(b'Nt.', '''\
+    0: N    NONE
+    1: t    TUPLE      no MARK exists on stack
+''', 'no MARK exists on stack')
+
+    def test_put(self):
+        self.check_dis(b'Np0\nq\x01r\x02\x00\x00\x00\x94.', '''\
+    0: N    NONE
+    1: p    PUT        0
+    4: q    BINPUT     1
+    6: r    LONG_BINPUT 2
+   11: \\x94 MEMOIZE    (as 3)
+   12: .    STOP
+highest protocol among opcodes = 4
+''')
+
+    def test_put_redefined(self):
+        self.check_dis_error(b'Np1\np1\n.', '''\
+    0: N    NONE
+    1: p    PUT        1
+    4: p    PUT        1
+''', 'memo key 1 already defined')
+        self.check_dis_error(b'Np1\nq\x01.', '''\
+    0: N    NONE
+    1: p    PUT        1
+    4: q    BINPUT     1
+''', 'memo key 1 already defined')
+        self.check_dis_error(b'Np1\nr\x01\x00\x00\x00.', '''\
+    0: N    NONE
+    1: p    PUT        1
+    4: r    LONG_BINPUT 1
+''', 'memo key 1 already defined')
+        self.check_dis_error(b'Np1\n\x94.', '''\
+    0: N    NONE
+    1: p    PUT        1
+    4: \\x94 MEMOIZE    (as 1)
+''', 'memo key None already defined')
+
+    def test_put_empty_stack(self):
+        self.check_dis_error(b'p0\n', '''\
+    0: p    PUT        0
+''', "stack is empty -- can't store into memo")
+
+    def test_put_markobject(self):
+        self.check_dis_error(b'(p0\n', '''\
+    0: (    MARK
+    1: p        PUT        0
+''', "can't store markobject in the memo")
+
+    def test_get(self):
+        self.check_dis(b'(Np1\ng1\nh\x01j\x01\x00\x00\x00t.', '''\
+    0: (    MARK
+    1: N        NONE
+    2: p        PUT        1
+    5: g        GET        1
+    8: h        BINGET     1
+   10: j        LONG_BINGET 1
+   15: t        TUPLE      (MARK at 0)
+   16: .    STOP
+highest protocol among opcodes = 1
+''')
+
+    def test_get_without_put(self):
+        self.check_dis_error(b'g1\n.', '''\
+    0: g    GET        1
+''', 'memo key 1 has never been stored into')
+        self.check_dis_error(b'h\x01.', '''\
+    0: h    BINGET     1
+''', 'memo key 1 has never been stored into')
+        self.check_dis_error(b'j\x01\x00\x00\x00.', '''\
+    0: j    LONG_BINGET 1
+''', 'memo key 1 has never been stored into')
+
+    def test_memo(self):
+        memo = {}
+        self.check_dis(b'Np1\n.', '''\
+    0: N    NONE
+    1: p    PUT        1
+    4: .    STOP
+highest protocol among opcodes = 0
+''', memo=memo)
+        self.check_dis(b'g1\n.', '''\
+    0: g    GET        1
+    3: .    STOP
+highest protocol among opcodes = 0
+''', memo=memo)
+
+    def test_mark_pop(self):
+        self.check_dis(b'(N00N.', '''\
+    0: (    MARK
+    1: N        NONE
+    2: 0        POP
+    3: 0        POP        (MARK at 0)
+    4: N    NONE
+    5: .    STOP
+highest protocol among opcodes = 0
+''')
+
+    def test_too_small_stack(self):
+        self.check_dis_error(b'a', '''\
+    0: a    APPEND
+''', 'tries to pop 2 items from stack with only 0 items')
+        self.check_dis_error(b']a', '''\
+    0: ]    EMPTY_LIST
+    1: a    APPEND
+''', 'tries to pop 2 items from stack with only 1 items')
+
+    def test_no_stop(self):
+        self.check_dis_error(b'N', '''\
+    0: N    NONE
+''', 'pickle exhausted before seeing STOP')
+
+    def test_truncated_data(self):
+        self.check_dis_error(b'NI123', '''\
+    0: N    NONE
+''', 'no newline found when trying to read stringnl')
+        self.check_dis_error(b'NJ\x12\x34', '''\
+    0: N    NONE
+''', 'not enough data in stream to read int4')
+
+    def test_unknown_opcode(self):
+        self.check_dis_error(b'N\xff', '''\
+    0: N    NONE
+''', r"at position 1, opcode b'\\xff' unknown")
+
+    def test_stop_not_empty_stack(self):
+        self.check_dis_error(b']N.', '''\
+    0: ]    EMPTY_LIST
+    1: N    NONE
+    2: .    STOP
+highest protocol among opcodes = 1
+''', r'stack not empty after STOP: \[list\]')
+
+    def test_annotate(self):
+        self.check_dis(b'(Nt.', '''\
+    0: (    MARK Push markobject onto the stack.
+    1: N        NONE Push None on the stack.
+    2: t        TUPLE      (MARK at 0) Build a tuple out of the topmost stack 
slice, after markobject.
+    3: .    STOP                       Stop the unpickling machine.
+highest protocol among opcodes = 0
+''', annotate=1)
+        self.check_dis(b'(Nt.', '''\
+    0: (    MARK            Push markobject onto the stack.
+    1: N        NONE        Push None on the stack.
+    2: t        TUPLE      (MARK at 0) Build a tuple out of the topmost stack 
slice, after markobject.
+    3: .    STOP                       Stop the unpickling machine.
+highest protocol among opcodes = 0
+''', annotate=20)
+        self.check_dis(b'(((((((ttttttt.', '''\
+    0: (    MARK            Push markobject onto the stack.
+    1: (        MARK        Push markobject onto the stack.
+    2: (            MARK    Push markobject onto the stack.
+    3: (                MARK Push markobject onto the stack.
+    4: (                    MARK Push markobject onto the stack.
+    5: (                        MARK Push markobject onto the stack.
+    6: (                            MARK Push markobject onto the stack.
+    7: t                                TUPLE      (MARK at 6) Build a tuple 
out of the topmost stack slice, after markobject.
+    8: t                            TUPLE      (MARK at 5) Build a tuple out 
of the topmost stack slice, after markobject.
+    9: t                        TUPLE      (MARK at 4) Build a tuple out of 
the topmost stack slice, after markobject.
+   10: t                    TUPLE      (MARK at 3)     Build a tuple out of 
the topmost stack slice, after markobject.
+   11: t                TUPLE      (MARK at 2)         Build a tuple out of 
the topmost stack slice, after markobject.
+   12: t            TUPLE      (MARK at 1)             Build a tuple out of 
the topmost stack slice, after markobject.
+   13: t        TUPLE      (MARK at 0)                 Build a tuple out of 
the topmost stack slice, after markobject.
+   14: .    STOP                                       Stop the unpickling 
machine.
+highest protocol among opcodes = 0
+''', annotate=20)
+
+
 class MiscTestCase(unittest.TestCase):
     def test__all__(self):
         not_exported = {

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to