From c5cdccfe374ff9d45219f705511785318833f6eb Mon Sep 17 00:00:00 2001
From: Jacob Champion <jacob.champion@enterprisedb.com>
Date: Tue, 10 Jun 2025 16:38:59 -0700
Subject: [PATCH v3 1/5] oauth: Remove stale events from the kqueue multiplexer

If a socket is added to the kqueue, becomes readable/writable, and
subsequently becomes non-readable/writable again, the kqueue itself will
remain readable until either the socket registration is removed, or the
stale event is cleared via a call to kevent().

In many simple cases, Curl itself will remove the socket registration
quickly, but in real-world usage, this is not guaranteed to happen. The
kqueue can then remain stuck in a permanently readable state until the
request ends, which results in pointless wakeups for the client and
wasted CPU time.

Implement comb_multiplexer() to call kevent() and unstick any stale
events that would cause unnecessary callbacks. This is called right
after drive_request(), before we return control to the client to wait.

Suggested-by: Thomas Munro <thomas.munro@gmail.com>
---
 src/interfaces/libpq-oauth/oauth-curl.c | 74 +++++++++++++++++++++++--
 1 file changed, 68 insertions(+), 6 deletions(-)

diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c
index dba9a684fa8..3380a17628e 100644
--- a/src/interfaces/libpq-oauth/oauth-curl.c
+++ b/src/interfaces/libpq-oauth/oauth-curl.c
@@ -1376,6 +1376,60 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx,
 #endif
 }
 
+/*-------
+ * If there is no work to do on any of the descriptors in the multiplexer, then
+ * this function must ensure that the multiplexer is not readable.
+ *
+ * As a motivating example, consider the following sequence of events:
+ * 1. libcurl tries to write data to the send buffer, but it fills up.
+ * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the
+ *    client to wait.
+ * 3. The kernel partially drains the send buffer. The socket becomes writable,
+ *    and the client wakes up and calls back into the flow.
+ * 4. libcurl continues writing data to the send buffer, but it fills up again.
+ *    The socket is no longer writable.
+ *
+ * At this point, an epoll-based mux no longer signals readiness, so nothing
+ * further needs to be done. But a kqueue-based mux will continue to signal
+ * "ready" until either the EVFILT_WRITE registration is dropped for the socket,
+ * or the old socket-writable event is read from the queue. Since Curl isn't
+ * guaranteed to do the former, we must do the latter here.
+ */
+static bool
+comb_multiplexer(struct async_ctx *actx)
+{
+#if defined(HAVE_SYS_EPOLL_H)
+	/* The epoll implementation doesn't hold onto stale events. */
+	return true;
+#elif defined(HAVE_SYS_EVENT_H)
+	struct timespec timeout = {0};
+	struct kevent ev;
+
+	/*
+	 * Try to read a single pending event. We can actually ignore the result:
+	 * either we found an event to process, in which case the multiplexer is
+	 * correctly readable for that event at minimum, and it doesn't matter if
+	 * there are any stale events; or we didn't find any, in which case the
+	 * kernel will have discarded any stale events as it traveled to the end
+	 * of the queue.
+	 *
+	 * Note that this depends on our registrations being level-triggered --
+	 * even the timer, so we use a chained kqueue for that instead of an
+	 * EVFILT_TIMER on the top-level mux. If we used edge-triggered events,
+	 * this call would improperly discard them.
+	 */
+	if (kevent(actx->mux, NULL, 0, &ev, 1, &timeout) < 0)
+	{
+		actx_error(actx, "could not comb kqueue: %m");
+		return false;
+	}
+
+	return true;
+#else
+#error comb_multiplexer is not implemented on this platform
+#endif
+}
+
 /*
  * Enables or disables the timer in the multiplexer set. The timeout value is
  * in milliseconds (negative values disable the timer).
@@ -2755,13 +2809,21 @@ pg_fe_run_oauth_flow_impl(PGconn *conn)
 
 					if (status == PGRES_POLLING_FAILED)
 						goto error_return;
-					else if (status != PGRES_POLLING_OK)
-					{
-						/* not done yet */
-						return status;
-					}
+					else if (status == PGRES_POLLING_OK)
+						break;	/* done! */
 
-					break;
+					/*
+					 * This request is still running.
+					 *
+					 * Make sure that stale events don't cause us to come back
+					 * early. (Currently, this can occur only with kqueue.) If
+					 * this is forgotten, the multiplexer can get stuck in a
+					 * signalled state and we'll burn CPU cycles pointlessly.
+					 */
+					if (!comb_multiplexer(actx))
+						goto error_return;
+
+					return status;
 				}
 
 			case OAUTH_STEP_WAIT_INTERVAL:
-- 
2.34.1

