https://github.com/python/cpython/commit/405a2d74cbdef5a899c900b6897ec85fe465abd2 commit: 405a2d74cbdef5a899c900b6897ec85fe465abd2 branch: main author: Pieter Eendebak <pieter.eende...@gmail.com> committer: kumaraditya303 <kumaradi...@python.org> date: 2025-03-12T15:30:33+05:30 summary:
gh-123471: make `itertools.batched` thread-safe (#129416) files: A Lib/test/test_free_threading/test_itertools_batched.py A Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst M Modules/itertoolsmodule.c diff --git a/Lib/test/test_free_threading/test_itertools_batched.py b/Lib/test/test_free_threading/test_itertools_batched.py new file mode 100644 index 00000000000000..fa9e06bf07fa26 --- /dev/null +++ b/Lib/test/test_free_threading/test_itertools_batched.py @@ -0,0 +1,39 @@ +import unittest +import sys +from threading import Thread, Barrier +from itertools import batched +from test.support import threading_helper + + +threading_helper.requires_working_threading(module=True) + +class EnumerateThreading(unittest.TestCase): + + @threading_helper.reap_threads + def test_threading(self): + number_of_threads = 10 + number_of_iterations = 20 + barrier = Barrier(number_of_threads) + def work(it): + barrier.wait() + while True: + try: + _ = next(it) + except StopIteration: + break + + data = tuple(range(1000)) + for it in range(number_of_iterations): + batch_iterator = batched(data, 2) + worker_threads = [] + for ii in range(number_of_threads): + worker_threads.append( + Thread(target=work, args=[batch_iterator])) + + with threading_helper.start_threads(worker_threads): + pass + + barrier.reset() + +if __name__ == "__main__": + unittest.main() diff --git a/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst new file mode 100644 index 00000000000000..f34d0bcd4c1e37 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-02-19-08-06-37.gh-issue-123471.br7uyR.rst @@ -0,0 +1 @@ +Make concurrent iterations over :class:`itertools.batched` safe under free-threading. diff --git a/Modules/itertoolsmodule.c b/Modules/itertoolsmodule.c index 3e425ee5f92dcd..40e436a6717ba7 100644 --- a/Modules/itertoolsmodule.c +++ b/Modules/itertoolsmodule.c @@ -191,12 +191,12 @@ batched_next(PyObject *op) { batchedobject *bo = batchedobject_CAST(op); Py_ssize_t i; - Py_ssize_t n = bo->batch_size; + Py_ssize_t n = FT_ATOMIC_LOAD_SSIZE_RELAXED(bo->batch_size); PyObject *it = bo->it; PyObject *item; PyObject *result; - if (it == NULL) { + if (n < 0) { return NULL; } result = PyTuple_New(n); @@ -218,19 +218,28 @@ batched_next(PyObject *op) if (PyErr_Occurred()) { if (!PyErr_ExceptionMatches(PyExc_StopIteration)) { /* Input raised an exception other than StopIteration */ + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); +#endif Py_DECREF(result); return NULL; } PyErr_Clear(); } if (i == 0) { + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); +#endif Py_DECREF(result); return NULL; } if (bo->strict) { + FT_ATOMIC_STORE_SSIZE_RELAXED(bo->batch_size, -1); +#ifndef Py_GIL_DISABLED Py_CLEAR(bo->it); +#endif Py_DECREF(result); PyErr_SetString(PyExc_ValueError, "batched(): incomplete batch"); return NULL; _______________________________________________ Python-checkins mailing list -- python-checkins@python.org To unsubscribe send an email to python-checkins-le...@python.org https://mail.python.org/mailman3/lists/python-checkins.python.org/ Member address: arch...@mail-archive.com