https://github.com/python/cpython/commit/925907ea362c4c014086be48625ac7dd67645cfc
commit: 925907ea362c4c014086be48625ac7dd67645cfc
branch: main
author: mpage <mp...@meta.com>
committer: erlend-aasland <erlend.aasl...@protonmail.com>
date: 2024-01-23T20:25:41+01:00
summary:

gh-113884: Make queue.SimpleQueue thread-safe when the GIL is disabled (#114161)

* use the ParkingLot API to manage waiting threads
* use Argument Clinic's critical section directive to protect queue methods
* remove unnecessary overflow check

Co-authored-by: Erlend E. Aasland <erlend.aasl...@protonmail.com>

files:
A Misc/NEWS.d/next/Core and 
Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst
M Modules/_queuemodule.c
M Modules/clinic/_queuemodule.c.h

diff --git a/Misc/NEWS.d/next/Core and 
Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst b/Misc/NEWS.d/next/Core 
and Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst
new file mode 100644
index 00000000000000..6a39fd2f60ab81
--- /dev/null
+++ b/Misc/NEWS.d/next/Core and 
Builtins/2024-01-17-00-52-57.gh-issue-113884.CvEjUE.rst 
@@ -0,0 +1 @@
+Make :class:`queue.SimpleQueue` thread safe when the GIL is disabled.
diff --git a/Modules/_queuemodule.c b/Modules/_queuemodule.c
index 8fca3cdd0deb18..18b24855c52ad6 100644
--- a/Modules/_queuemodule.c
+++ b/Modules/_queuemodule.c
@@ -3,8 +3,9 @@
 #endif
 
 #include "Python.h"
-#include "pycore_ceval.h"         // _PyEval_MakePendingCalls()
+#include "pycore_ceval.h"         // Py_MakePendingCalls()
 #include "pycore_moduleobject.h"  // _PyModule_GetState()
+#include "pycore_parking_lot.h"
 #include "pycore_time.h"          // _PyTime_t
 
 #include <stdbool.h>
@@ -151,7 +152,9 @@ RingBuf_Get(RingBuf *buf)
     return item;
 }
 
-// Returns 0 on success or -1 if the buffer failed to grow
+// Returns 0 on success or -1 if the buffer failed to grow.
+//
+// Steals a reference to item.
 static int
 RingBuf_Put(RingBuf *buf, PyObject *item)
 {
@@ -164,7 +167,7 @@ RingBuf_Put(RingBuf *buf, PyObject *item)
             return -1;
         }
     }
-    buf->items[buf->put_idx] = Py_NewRef(item);
+    buf->items[buf->put_idx] = item;
     buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
     buf->num_items++;
     return 0;
@@ -184,9 +187,13 @@ RingBuf_IsEmpty(RingBuf *buf)
 
 typedef struct {
     PyObject_HEAD
-    PyThread_type_lock lock;
-    int locked;
+
+    // Are there threads waiting for items
+    bool has_threads_waiting;
+
+    // Items in the queue
     RingBuf buf;
+
     PyObject *weakreflist;
 } simplequeueobject;
 
@@ -209,12 +216,6 @@ simplequeue_dealloc(simplequeueobject *self)
     PyTypeObject *tp = Py_TYPE(self);
 
     PyObject_GC_UnTrack(self);
-    if (self->lock != NULL) {
-        /* Unlock the lock so it's safe to free it */
-        if (self->locked > 0)
-            PyThread_release_lock(self->lock);
-        PyThread_free_lock(self->lock);
-    }
     (void)simplequeue_clear(self);
     if (self->weakreflist != NULL)
         PyObject_ClearWeakRefs((PyObject *) self);
@@ -249,12 +250,6 @@ simplequeue_new_impl(PyTypeObject *type)
     self = (simplequeueobject *) type->tp_alloc(type, 0);
     if (self != NULL) {
         self->weakreflist = NULL;
-        self->lock = PyThread_allocate_lock();
-        if (self->lock == NULL) {
-            Py_DECREF(self);
-            PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
-            return NULL;
-        }
         if (RingBuf_Init(&self->buf) < 0) {
             Py_DECREF(self);
             return NULL;
@@ -264,7 +259,29 @@ simplequeue_new_impl(PyTypeObject *type)
     return (PyObject *) self;
 }
 
+typedef struct {
+    bool handed_off;
+    simplequeueobject *queue;
+    PyObject *item;
+} HandoffData;
+
+static void
+maybe_handoff_item(HandoffData *data, PyObject **item, int has_more_waiters)
+{
+    if (item == NULL) {
+        // No threads were waiting
+        data->handed_off = false;
+    }
+    else {
+        // There was at least one waiting thread, hand off the item
+        *item = data->item;
+        data->handed_off = true;
+    }
+    data->queue->has_threads_waiting = has_more_waiters;
+}
+
 /*[clinic input]
+@critical_section
 _queue.SimpleQueue.put
     item: object
     block: bool = True
@@ -280,21 +297,28 @@ never blocks.  They are provided for compatibility with 
the Queue class.
 static PyObject *
 _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
                             int block, PyObject *timeout)
-/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
+/*[clinic end generated code: output=4333136e88f90d8b input=a16dbb33363c0fa8]*/
 {
-    /* BEGIN GIL-protected critical section */
-    if (RingBuf_Put(&self->buf, item) < 0)
-        return NULL;
-    if (self->locked) {
-        /* A get() may be waiting, wake it up */
-        self->locked = 0;
-        PyThread_release_lock(self->lock);
+    HandoffData data = {
+        .handed_off = 0,
+        .item = Py_NewRef(item),
+        .queue = self,
+    };
+    if (self->has_threads_waiting) {
+        // Try to hand the item off directly if there are threads waiting
+        _PyParkingLot_Unpark(&self->has_threads_waiting,
+                             (_Py_unpark_fn_t *)maybe_handoff_item, &data);
+    }
+    if (!data.handed_off) {
+        if (RingBuf_Put(&self->buf, item) < 0) {
+            return NULL;
+        }
     }
-    /* END GIL-protected critical section */
     Py_RETURN_NONE;
 }
 
 /*[clinic input]
+@critical_section
 _queue.SimpleQueue.put_nowait
     item: object
 
@@ -307,12 +331,23 @@ for compatibility with the Queue class.
 
 static PyObject *
 _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
-/*[clinic end generated code: output=0990536715efb1f1 input=36b1ea96756b2ece]*/
+/*[clinic end generated code: output=0990536715efb1f1 input=ce949cc2cd8a4119]*/
 {
     return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
 }
 
+static PyObject *
+empty_error(PyTypeObject *cls)
+{
+    PyObject *module = PyType_GetModule(cls);
+    assert(module != NULL);
+    simplequeue_state *state = simplequeue_get_state(module);
+    PyErr_SetNone(state->EmptyError);
+    return NULL;
+}
+
 /*[clinic input]
+@critical_section
 _queue.SimpleQueue.get
 
     cls: defining_class
@@ -335,23 +370,15 @@ in that case).
 static PyObject *
 _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
                             int block, PyObject *timeout_obj)
-/*[clinic end generated code: output=5c2cca914cd1e55b input=5b4047bfbc645ec1]*/
+/*[clinic end generated code: output=5c2cca914cd1e55b input=f7836c65e5839c51]*/
 {
     _PyTime_t endtime = 0;
-    _PyTime_t timeout;
-    PyObject *item;
-    PyLockStatus r;
-    PY_TIMEOUT_T microseconds;
-    PyThreadState *tstate = PyThreadState_Get();
 
     // XXX Use PyThread_ParseTimeoutArg().
 
-    if (block == 0) {
-        /* Non-blocking */
-        microseconds = 0;
-    }
-    else if (timeout_obj != Py_None) {
+    if (block != 0 && !Py_IsNone(timeout_obj)) {
         /* With timeout */
+        _PyTime_t timeout;
         if (_PyTime_FromSecondsObject(&timeout,
                                       timeout_obj, _PyTime_ROUND_CEILING) < 0) 
{
             return NULL;
@@ -361,65 +388,64 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, 
PyTypeObject *cls,
                             "'timeout' must be a non-negative number");
             return NULL;
         }
-        microseconds = _PyTime_AsMicroseconds(timeout,
-                                              _PyTime_ROUND_CEILING);
-        if (microseconds > PY_TIMEOUT_MAX) {
-            PyErr_SetString(PyExc_OverflowError,
-                            "timeout value is too large");
-            return NULL;
-        }
         endtime = _PyDeadline_Init(timeout);
     }
-    else {
-        /* Infinitely blocking */
-        microseconds = -1;
-    }
 
-    /* put() signals the queue to be non-empty by releasing the lock.
-     * So we simply try to acquire the lock in a loop, until the condition
-     * (queue non-empty) becomes true.
-     */
-    while (RingBuf_IsEmpty(&self->buf)) {
-        /* First a simple non-blocking try without releasing the GIL */
-        r = PyThread_acquire_lock_timed(self->lock, 0, 0);
-        if (r == PY_LOCK_FAILURE && microseconds != 0) {
-            Py_BEGIN_ALLOW_THREADS
-            r = PyThread_acquire_lock_timed(self->lock, microseconds, 1);
-            Py_END_ALLOW_THREADS
+    for (;;) {
+        if (!RingBuf_IsEmpty(&self->buf)) {
+            return RingBuf_Get(&self->buf);
         }
 
-        if (r == PY_LOCK_INTR && _PyEval_MakePendingCalls(tstate) < 0) {
-            return NULL;
-        }
-        if (r == PY_LOCK_FAILURE) {
-            PyObject *module = PyType_GetModule(cls);
-            simplequeue_state *state = simplequeue_get_state(module);
-            /* Timed out */
-            PyErr_SetNone(state->EmptyError);
-            return NULL;
+        if (!block) {
+            return empty_error(cls);
         }
-        self->locked = 1;
 
-        /* Adjust timeout for next iteration (if any) */
-        if (microseconds > 0) {
-            timeout = _PyDeadline_Get(endtime);
-            microseconds = _PyTime_AsMicroseconds(timeout,
-                                                  _PyTime_ROUND_CEILING);
+        int64_t timeout_ns = -1;
+        if (endtime != 0) {
+            timeout_ns = _PyDeadline_Get(endtime);
+            if (timeout_ns < 0) {
+                return empty_error(cls);
+            }
         }
-    }
 
-    /* BEGIN GIL-protected critical section */
-    item = RingBuf_Get(&self->buf);
-    if (self->locked) {
-        PyThread_release_lock(self->lock);
-        self->locked = 0;
+        bool waiting = 1;
+        self->has_threads_waiting = waiting;
+
+        PyObject *item = NULL;
+        int st = _PyParkingLot_Park(&self->has_threads_waiting, &waiting,
+                                    sizeof(bool), timeout_ns, &item,
+                                    /* detach */ 1);
+        switch (st) {
+            case Py_PARK_OK: {
+                assert(item != NULL);
+                return item;
+            }
+            case Py_PARK_TIMEOUT: {
+                return empty_error(cls);
+            }
+            case Py_PARK_INTR: {
+                // Interrupted
+                if (Py_MakePendingCalls() < 0) {
+                    return NULL;
+                }
+                break;
+            }
+            case Py_PARK_AGAIN: {
+                // This should be impossible with the current implementation of
+                // PyParkingLot, but would be possible if critical sections /
+                // the GIL were released before the thread was added to the
+                // internal thread queue in the parking lot.
+                break;
+            }
+            default: {
+                Py_UNREACHABLE();
+            }
+        }
     }
-    /* END GIL-protected critical section */
-
-    return item;
 }
 
 /*[clinic input]
+@critical_section
 _queue.SimpleQueue.get_nowait
 
     cls: defining_class
@@ -434,12 +460,13 @@ raise the Empty exception.
 static PyObject *
 _queue_SimpleQueue_get_nowait_impl(simplequeueobject *self,
                                    PyTypeObject *cls)
-/*[clinic end generated code: output=620c58e2750f8b8a input=842f732bf04216d3]*/
+/*[clinic end generated code: output=620c58e2750f8b8a input=d48be63633fefae9]*/
 {
     return _queue_SimpleQueue_get_impl(self, cls, 0, Py_None);
 }
 
 /*[clinic input]
+@critical_section
 _queue.SimpleQueue.empty -> bool
 
 Return True if the queue is empty, False otherwise (not reliable!).
@@ -447,12 +474,13 @@ Return True if the queue is empty, False otherwise (not 
reliable!).
 
 static int
 _queue_SimpleQueue_empty_impl(simplequeueobject *self)
-/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
+/*[clinic end generated code: output=1a02a1b87c0ef838 input=96cb22df5a67d831]*/
 {
     return RingBuf_IsEmpty(&self->buf);
 }
 
 /*[clinic input]
+@critical_section
 _queue.SimpleQueue.qsize -> Py_ssize_t
 
 Return the approximate size of the queue (not reliable!).
@@ -460,7 +488,7 @@ Return the approximate size of the queue (not reliable!).
 
 static Py_ssize_t
 _queue_SimpleQueue_qsize_impl(simplequeueobject *self)
-/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
+/*[clinic end generated code: output=f9dcd9d0a90e121e input=e218623cb8c16a79]*/
 {
     return RingBuf_Len(&self->buf);
 }
diff --git a/Modules/clinic/_queuemodule.c.h b/Modules/clinic/_queuemodule.c.h
index 8e2a430835e35f..b3b6b8e96c135e 100644
--- a/Modules/clinic/_queuemodule.c.h
+++ b/Modules/clinic/_queuemodule.c.h
@@ -6,6 +6,7 @@ preserve
 #  include "pycore_gc.h"          // PyGC_Head
 #  include "pycore_runtime.h"     // _Py_ID()
 #endif
+#include "pycore_critical_section.h"// Py_BEGIN_CRITICAL_SECTION()
 #include "pycore_modsupport.h"    // _PyArg_NoKeywords()
 
 PyDoc_STRVAR(simplequeue_new__doc__,
@@ -107,7 +108,9 @@ _queue_SimpleQueue_put(simplequeueobject *self, PyObject 
*const *args, Py_ssize_
     }
     timeout = args[2];
 skip_optional_pos:
+    Py_BEGIN_CRITICAL_SECTION(self);
     return_value = _queue_SimpleQueue_put_impl(self, item, block, timeout);
+    Py_END_CRITICAL_SECTION();
 
 exit:
     return return_value;
@@ -165,7 +168,9 @@ _queue_SimpleQueue_put_nowait(simplequeueobject *self, 
PyObject *const *args, Py
         goto exit;
     }
     item = args[0];
+    Py_BEGIN_CRITICAL_SECTION(self);
     return_value = _queue_SimpleQueue_put_nowait_impl(self, item);
+    Py_END_CRITICAL_SECTION();
 
 exit:
     return return_value;
@@ -244,7 +249,9 @@ _queue_SimpleQueue_get(simplequeueobject *self, 
PyTypeObject *cls, PyObject *con
     }
     timeout_obj = args[1];
 skip_optional_pos:
+    Py_BEGIN_CRITICAL_SECTION(self);
     return_value = _queue_SimpleQueue_get_impl(self, cls, block, timeout_obj);
+    Py_END_CRITICAL_SECTION();
 
 exit:
     return return_value;
@@ -269,11 +276,18 @@ _queue_SimpleQueue_get_nowait_impl(simplequeueobject 
*self,
 static PyObject *
 _queue_SimpleQueue_get_nowait(simplequeueobject *self, PyTypeObject *cls, 
PyObject *const *args, Py_ssize_t nargs, PyObject *kwnames)
 {
+    PyObject *return_value = NULL;
+
     if (nargs) {
         PyErr_SetString(PyExc_TypeError, "get_nowait() takes no arguments");
-        return NULL;
+        goto exit;
     }
-    return _queue_SimpleQueue_get_nowait_impl(self, cls);
+    Py_BEGIN_CRITICAL_SECTION(self);
+    return_value = _queue_SimpleQueue_get_nowait_impl(self, cls);
+    Py_END_CRITICAL_SECTION();
+
+exit:
+    return return_value;
 }
 
 PyDoc_STRVAR(_queue_SimpleQueue_empty__doc__,
@@ -294,7 +308,9 @@ _queue_SimpleQueue_empty(simplequeueobject *self, PyObject 
*Py_UNUSED(ignored))
     PyObject *return_value = NULL;
     int _return_value;
 
+    Py_BEGIN_CRITICAL_SECTION(self);
     _return_value = _queue_SimpleQueue_empty_impl(self);
+    Py_END_CRITICAL_SECTION();
     if ((_return_value == -1) && PyErr_Occurred()) {
         goto exit;
     }
@@ -322,7 +338,9 @@ _queue_SimpleQueue_qsize(simplequeueobject *self, PyObject 
*Py_UNUSED(ignored))
     PyObject *return_value = NULL;
     Py_ssize_t _return_value;
 
+    Py_BEGIN_CRITICAL_SECTION(self);
     _return_value = _queue_SimpleQueue_qsize_impl(self);
+    Py_END_CRITICAL_SECTION();
     if ((_return_value == -1) && PyErr_Occurred()) {
         goto exit;
     }
@@ -331,4 +349,4 @@ _queue_SimpleQueue_qsize(simplequeueobject *self, PyObject 
*Py_UNUSED(ignored))
 exit:
     return return_value;
 }
-/*[clinic end generated code: output=457310b20cb61cf8 input=a9049054013a1b77]*/
+/*[clinic end generated code: output=242950edc8f7dfd7 input=a9049054013a1b77]*/

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to