https://github.com/python/cpython/commit/388e1ca9f08ee5caefd1dd946dc6e236ce73d46f
commit: 388e1ca9f08ee5caefd1dd946dc6e236ce73d46f
branch: main
author: T. Wouters <[email protected]>
committer: Yhg1s <[email protected]>
date: 2025-02-18T16:52:46-08:00
summary:

gh-115999: Make list and tuple iteration more thread-safe. (#128637)

Make tuple iteration more thread-safe, and actually test concurrent iteration 
of tuple, range and list. (This is prep work for enabling specialization of 
FOR_ITER in free-threaded builds.) The basic premise is:

Iterating over a shared iterable (list, tuple or range) should be safe, not 
involve data races, and behave like iteration normally does.

Using a shared iterator should not crash or involve data races, and should only 
produce items regular iteration would produce. It is not guaranteed to produce 
all items, or produce each item only once. (This is not the case for range 
iteration even after this PR.)

Providing stronger guarantees is possible for some of these iterators, but it's 
not always straight-forward and can significantly hamper the common case. Since 
iterators in general aren't shared between threads, and it's simply impossible 
to concurrently use many iterators (like generators), better to make sharing 
iterators without explicit synchronization clearly wrong.

Specific issues fixed in order to make the tests pass:

 - List iteration could occasionally fail an assertion when a shared list was 
shrunk and an item past the new end was retrieved concurrently. There's still 
some unsafety when deleting/inserting multiple items through for example slice 
assignment, which uses memmove/memcpy.

 - Tuple iteration could occasionally crash when the iterator's reference to 
the tuple was cleared on exhaustion. Like with list iteration, in free-threaded 
builds we can't safely and efficiently clear the iterator's reference to the 
iterable (doing it safely would mean extra, slow refcount operations), so just 
keep the iterable reference around.

files:
A Lib/test/test_free_threading/test_iteration.py
M Objects/listobject.c
M Objects/tupleobject.c
M Tools/tsan/suppressions_free_threading.txt

diff --git a/Lib/test/test_free_threading/test_iteration.py 
b/Lib/test/test_free_threading/test_iteration.py
new file mode 100644
index 00000000000000..a51ad0cf83a006
--- /dev/null
+++ b/Lib/test/test_free_threading/test_iteration.py
@@ -0,0 +1,127 @@
+import threading
+import unittest
+from test import support
+
+# The race conditions these tests were written for only happen every now and
+# then, even with the current numbers. To find rare race conditions, bumping
+# these up will help, but it makes the test runtime highly variable under
+# free-threading. Overhead is much higher under ThreadSanitizer, but it's
+# also much better at detecting certain races, so we don't need as many
+# items/threads.
+if support.check_sanitizer(thread=True):
+    NUMITEMS = 1000
+    NUMTHREADS = 2
+else:
+    NUMITEMS = 100000
+    NUMTHREADS = 5
+NUMMUTATORS = 2
+
+class ContendedTupleIterationTest(unittest.TestCase):
+    def make_testdata(self, n):
+        return tuple(range(n))
+
+    def assert_iterator_results(self, results, expected):
+        # Most iterators are not atomic (yet?) so they can skip or duplicate
+        # items, but they should not invent new items (like the range
+        # iterator currently does).
+        extra_items = set(results) - set(expected)
+        self.assertEqual(set(), extra_items)
+
+    def run_threads(self, func, *args, numthreads=NUMTHREADS):
+        threads = []
+        for _ in range(numthreads):
+            t = threading.Thread(target=func, args=args)
+            t.start()
+            threads.append(t)
+        return threads
+
+    def test_iteration(self):
+        """Test iteration over a shared container"""
+        seq = self.make_testdata(NUMITEMS)
+        results = []
+        start = threading.Barrier(NUMTHREADS)
+        def worker():
+            idx = 0
+            start.wait()
+            for item in seq:
+                idx += 1
+            results.append(idx)
+        threads = self.run_threads(worker)
+        for t in threads:
+            t.join()
+        # Each thread has its own iterator, so results should be entirely 
predictable.
+        self.assertEqual(results, [NUMITEMS] * NUMTHREADS)
+
+    def test_shared_iterator(self):
+        """Test iteration over a shared iterator"""
+        seq = self.make_testdata(NUMITEMS)
+        it = iter(seq)
+        results = []
+        start = threading.Barrier(NUMTHREADS)
+        def worker():
+            items = []
+            start.wait()
+            # We want a tight loop, so put items in the shared list at the end.
+            for item in it:
+                items.append(item)
+            results.extend(items)
+        threads = self.run_threads(worker)
+        for t in threads:
+            t.join()
+        self.assert_iterator_results(results, seq)
+
+class ContendedListIterationTest(ContendedTupleIterationTest):
+    def make_testdata(self, n):
+        return list(range(n))
+
+    def test_iteration_while_mutating(self):
+        """Test iteration over a shared mutating container."""
+        seq = self.make_testdata(NUMITEMS)
+        results = []
+        start = threading.Barrier(NUMTHREADS + NUMMUTATORS)
+        endmutate = threading.Event()
+        def mutator():
+            orig = seq[:]
+            # Make changes big enough to cause resizing of the list, with
+            # items shifted around for good measure.
+            replacement = (orig * 3)[NUMITEMS//2:]
+            start.wait()
+            while not endmutate.is_set():
+                seq.extend(replacement)
+                seq[:0] = orig
+                seq.__imul__(2)
+                seq.extend(seq)
+                seq[:] = orig
+        def worker():
+            items = []
+            start.wait()
+            # We want a tight loop, so put items in the shared list at the end.
+            for item in seq:
+                items.append(item)
+            results.extend(items)
+        mutators = ()
+        try:
+            threads = self.run_threads(worker)
+            mutators = self.run_threads(mutator, numthreads=NUMMUTATORS)
+            for t in threads:
+                t.join()
+        finally:
+            endmutate.set()
+            for m in mutators:
+                m.join()
+        self.assert_iterator_results(results, list(seq))
+
+
+class ContendedRangeIterationTest(ContendedTupleIterationTest):
+    def make_testdata(self, n):
+        return range(n)
+
+    def assert_iterator_results(self, results, expected):
+        # Range iterators that are shared between threads will (right now)
+        # sometimes produce items after the end of the range, sometimes
+        # _far_ after the end of the range. That should be fixed, but for
+        # now, let's just check they're integers that could have resulted
+        # from stepping beyond the range bounds.
+        extra_items = set(results) - set(expected)
+        for item in extra_items:
+            self.assertEqual((item - expected.start) % expected.step, 0)
diff --git a/Objects/listobject.c b/Objects/listobject.c
index 120e353b709e7b..84faa5a32a1f2a 100644
--- a/Objects/listobject.c
+++ b/Objects/listobject.c
@@ -357,7 +357,7 @@ list_get_item_ref(PyListObject *op, Py_ssize_t i)
         return NULL;
     }
     Py_ssize_t cap = list_capacity(ob_item);
-    assert(cap != -1 && cap >= size);
+    assert(cap != -1);
     if (!valid_index(i, cap)) {
         return NULL;
     }
@@ -784,7 +784,8 @@ list_repeat_lock_held(PyListObject *a, Py_ssize_t n)
             _Py_RefcntAdd(*src, n);
             *dest++ = *src++;
         }
-
+        // TODO: _Py_memory_repeat calls are not safe for shared lists in
+        // GIL_DISABLED builds. (See issue #129069)
         _Py_memory_repeat((char *)np->ob_item, sizeof(PyObject *)*output_size,
                                         sizeof(PyObject *)*input_size);
     }
@@ -919,6 +920,8 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t ilow, 
Py_ssize_t ihigh, PyO
     if (d < 0) { /* Delete -d items */
         Py_ssize_t tail;
         tail = (Py_SIZE(a) - ihigh) * sizeof(PyObject *);
+        // TODO: these memmove/memcpy calls are not safe for shared lists in
+        // GIL_DISABLED builds. (See issue #129069)
         memmove(&item[ihigh+d], &item[ihigh], tail);
         if (list_resize(a, Py_SIZE(a) + d) < 0) {
             memmove(&item[ihigh], &item[ihigh+d], tail);
@@ -932,12 +935,14 @@ list_ass_slice_lock_held(PyListObject *a, Py_ssize_t 
ilow, Py_ssize_t ihigh, PyO
         if (list_resize(a, k+d) < 0)
             goto Error;
         item = a->ob_item;
+        // TODO: these memmove/memcpy calls are not safe for shared lists in
+        // GIL_DISABLED builds. (See issue #129069)
         memmove(&item[ihigh+d], &item[ihigh],
             (k - ihigh)*sizeof(PyObject *));
     }
     for (k = 0; k < n; k++, ilow++) {
         PyObject *w = vitem[k];
-        item[ilow] = Py_XNewRef(w);
+        FT_ATOMIC_STORE_PTR_RELEASE(item[ilow], Py_XNewRef(w));
     }
     for (k = norig - 1; k >= 0; --k)
         Py_XDECREF(recycle[k]);
@@ -1017,6 +1022,8 @@ list_inplace_repeat_lock_held(PyListObject *self, 
Py_ssize_t n)
     for (Py_ssize_t j = 0; j < input_size; j++) {
         _Py_RefcntAdd(items[j], n-1);
     }
+    // TODO: _Py_memory_repeat calls are not safe for shared lists in
+    // GIL_DISABLED builds. (See issue #129069)
     _Py_memory_repeat((char *)items, sizeof(PyObject *)*output_size,
                       sizeof(PyObject *)*input_size);
     return 0;
@@ -3993,7 +4000,7 @@ listiter_setstate(PyObject *self, PyObject *state)
             index = -1;
         else if (index > PyList_GET_SIZE(it->it_seq))
             index = PyList_GET_SIZE(it->it_seq); /* iterator exhausted */
-        it->it_index = index;
+        FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index);
     }
     Py_RETURN_NONE;
 }
@@ -4145,7 +4152,7 @@ listreviter_setstate(PyObject *self, PyObject *state)
             index = -1;
         else if (index > PyList_GET_SIZE(it->it_seq) - 1)
             index = PyList_GET_SIZE(it->it_seq) - 1;
-        it->it_index = index;
+        FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index);
     }
     Py_RETURN_NONE;
 }
@@ -4162,18 +4169,19 @@ listiter_reduce_general(void *_it, int forward)
      * call must be before access of iterator pointers.
      * see issue #101765 */
 
-    /* the objects are not the same, index is of different types! */
     if (forward) {
         iter = _PyEval_GetBuiltin(&_Py_ID(iter));
         _PyListIterObject *it = (_PyListIterObject *)_it;
-        if (it->it_index >= 0) {
-            return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index);
+        Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
+        if (idx >= 0) {
+            return Py_BuildValue("N(O)n", iter, it->it_seq, idx);
         }
     } else {
         iter = _PyEval_GetBuiltin(&_Py_ID(reversed));
         listreviterobject *it = (listreviterobject *)_it;
-        if (it->it_index >= 0) {
-            return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index);
+        Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
+        if (idx >= 0) {
+            return Py_BuildValue("N(O)n", iter, it->it_seq, idx);
         }
     }
     /* empty iterator, create an empty list */
diff --git a/Objects/tupleobject.c b/Objects/tupleobject.c
index 60af9e40e3fe83..b7416a5a1c5372 100644
--- a/Objects/tupleobject.c
+++ b/Objects/tupleobject.c
@@ -1014,18 +1014,23 @@ tupleiter_next(PyObject *self)
 
     assert(it != NULL);
     seq = it->it_seq;
+#ifndef Py_GIL_DISABLED
     if (seq == NULL)
         return NULL;
+#endif
     assert(PyTuple_Check(seq));
 
-    if (it->it_index < PyTuple_GET_SIZE(seq)) {
-        item = PyTuple_GET_ITEM(seq, it->it_index);
-        ++it->it_index;
+    Py_ssize_t index = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
+    if (index < PyTuple_GET_SIZE(seq)) {
+        FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index + 1);
+        item = PyTuple_GET_ITEM(seq, index);
         return Py_NewRef(item);
     }
 
+#ifndef Py_GIL_DISABLED
     it->it_seq = NULL;
     Py_DECREF(seq);
+#endif
     return NULL;
 }
 
@@ -1034,8 +1039,15 @@ tupleiter_len(PyObject *self, PyObject 
*Py_UNUSED(ignored))
 {
     _PyTupleIterObject *it = _PyTupleIterObject_CAST(self);
     Py_ssize_t len = 0;
+#ifdef Py_GIL_DISABLED
+    Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
+    Py_ssize_t seq_len = PyTuple_GET_SIZE(it->it_seq);
+    if (idx < seq_len)
+        len = seq_len - idx;
+#else
     if (it->it_seq)
         len = PyTuple_GET_SIZE(it->it_seq) - it->it_index;
+#endif
     return PyLong_FromSsize_t(len);
 }
 
@@ -1051,10 +1063,15 @@ tupleiter_reduce(PyObject *self, PyObject 
*Py_UNUSED(ignored))
      * see issue #101765 */
     _PyTupleIterObject *it = _PyTupleIterObject_CAST(self);
 
+#ifdef Py_GIL_DISABLED
+    Py_ssize_t idx = FT_ATOMIC_LOAD_SSIZE_RELAXED(it->it_index);
+    if (idx < PyTuple_GET_SIZE(it->it_seq))
+        return Py_BuildValue("N(O)n", iter, it->it_seq, idx);
+#else
     if (it->it_seq)
         return Py_BuildValue("N(O)n", iter, it->it_seq, it->it_index);
-    else
-        return Py_BuildValue("N(())", iter);
+#endif
+    return Py_BuildValue("N(())", iter);
 }
 
 static PyObject *
@@ -1069,7 +1086,7 @@ tupleiter_setstate(PyObject *self, PyObject *state)
             index = 0;
         else if (index > PyTuple_GET_SIZE(it->it_seq))
             index = PyTuple_GET_SIZE(it->it_seq); /* exhausted iterator */
-        it->it_index = index;
+        FT_ATOMIC_STORE_SSIZE_RELAXED(it->it_index, index);
     }
     Py_RETURN_NONE;
 }
diff --git a/Tools/tsan/suppressions_free_threading.txt 
b/Tools/tsan/suppressions_free_threading.txt
index e4cf2a76db35cc..1802473fea3fcc 100644
--- a/Tools/tsan/suppressions_free_threading.txt
+++ b/Tools/tsan/suppressions_free_threading.txt
@@ -18,12 +18,10 @@ race:set_allocator_unlocked
 # https://gist.github.com/swtaarrs/08dfe7883b4c975c31ecb39388987a67
 race:free_threadstate
 
-
 # These warnings trigger directly in a CPython function.
 
 race_top:assign_version_tag
 race_top:_multiprocessing_SemLock_acquire_impl
-race_top:list_get_item_ref
 race_top:_Py_slot_tp_getattr_hook
 race_top:add_threadstate
 race_top:dump_traceback
@@ -54,3 +52,12 @@ race_top:update_one_slot
 
 # https://gist.github.com/mpage/6962e8870606cfc960e159b407a0cb40
 thread:pthread_create
+
+# Range iteration is not thread-safe yet (issue #129068)
+race_top:rangeiter_next
+
+# List resizing happens through different paths ending in memcpy or memmove
+# (for efficiency), which will probably need to rewritten as explicit loops
+# of ptr-sized copies to be thread-safe. (Issue #129069)
+race:list_ass_slice_lock_held
+race:list_inplace_repeat_lock_held

_______________________________________________
Python-checkins mailing list -- [email protected]
To unsubscribe send an email to [email protected]
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: [email protected]

Reply via email to