On Wed, Aug 6, 2025 at 9:13 AM Jacob Champion <[email protected]> wrote: > Maybe "drain" would no longer be the > verb to use there.
I keep describing this as "combing" the queue when I talk about it in person, so v3-0001 renames this new operation to comb_multiplexer(). And the CI (plus the more strenuous TLS tests) confirms that the callback count is still stable with this weaker guarantee, so I've gotten rid of the event-counting code. Now that I'm no longer counting events, I can collapse the changes to register_socket(). I can't revert those changes entirely, because then we regress the case where Curl switches a socket from IN to OUT (this is enforced by the new unit tests). But I'm not sure that the existing comment adequately explained that fix anyway, and I didn't remember to call it out in my initial email, so I've split it out into v3-0002. It's much smaller. The tests (now in 0005) have been adjusted for the new "combing" behavior, and I've added a case to ensure that multiple stale events are swept up by a single call to comb_multiplexer(). Thanks! --Jacob
1: 379c12b5d26 < -: ----------- oauth: Remove stale events from the kqueue
multiplexer
-: ----------- > 1: c5cdccfe374 oauth: Remove stale events from the kqueue
multiplexer
-: ----------- > 2: 7725e0c173b oauth: Ensure unused socket registrations are
removed
2: f30317d7265 ! 3: 6ccf7a5d156 oauth: Remove expired timers from the
multiplexer
@@ src/interfaces/libpq-oauth/oauth-curl.c:
pg_fe_run_oauth_flow_impl(PGconn *conn)
+ * edge-triggered) timeouts, and ours
are level-triggered
+ * via the mux.
+ *
-+ * This can't be combined with the
drain_socket_events()
-+ * call below: we might accidentally
clear a short timeout
-+ * that was both set and expired during
the call to
++ * This can't be combined with the
comb_multiplexer() call
++ * below: we might accidentally clear a
short timeout that
++ * was both set and expired during the
call to
+ * drive_request().
+ */
+ if (!drain_timer_events(actx, NULL))
3: d243d28964d ! 4: 2be993b8f07 oauth: Track total call count during a client
flow
@@ Metadata
## Commit message ##
oauth: Track total call count during a client flow
- Tracking down the bugs that led to the addition of
drain_socket_events()
+ Tracking down the bugs that led to the addition of comb_multiplexer()
and drain_timer_events() was difficult, because an inefficient flow is
not visibly different from one that is working properly. To help
maintainers notice when something has gone wrong, track the number of
@@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
bool used_basic_auth; /* did we send a client secret?
*/
bool debugging; /* can we give unsafe developer
assistance? */
+ int dbg_num_calls; /* (debug mode) how many times
were we called? */
+ };
- #if defined(HAVE_SYS_EVENT_H)
- int nevents; /* how many events are
we waiting on? */
+ /*
@@ src/interfaces/libpq-oauth/oauth-curl.c: PostgresPollingStatusType
pg_fe_run_oauth_flow(PGconn *conn)
{
@@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow(PGconn
*conn)
result = pg_fe_run_oauth_flow_impl(conn);
+ /*
-+ * To assist with finding bugs in drain_socket_events() and
++ * To assist with finding bugs in comb_multiplexer() and
+ * drain_timer_events(), when we're in debug mode, track the total
number
+ * of calls to this function and print that at the end of the flow.
+ *
4: ca6fd237653 ! 5: 50257bf32eb oauth: Add unit tests for multiplexer handling
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /*
+ * Draining the pipe should unset the multiplexer again, once
the old
-+ * event is drained.
++ * event is cleared.
+ */
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd is drained");
+
+ /* Undo any unidirectional emulation. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ ssize_t written;
+
+ /*
-+ * Fill the pipe. Once the old writable event is
drained, the mux
++ * Fill the pipe. Once the old writable event is
cleared, the mux
+ * should not be ready.
+ */
+ Assert((written = fill_pipe(wfd)) > 0);
+ printf("# pipe buffer is full at %zd bytes\n", written);
+
-+ Assert(drain_socket_events(actx));
++ Assert(comb_multiplexer(actx));
+ mux_is_not_ready(actx->mux, "when fd buffer is full");
+
+ /* Drain the pipe again. */
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+
+ /* Make sure an expired timer doesn't interfere with event
draining. */
+ {
++ bool expired;
++
+ /* Make the rfd appear unidirectional if necessary. */
+ if (bidirectional)
+ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ * Draining the pipe should unset the multiplexer
again, once the
+ * old event is drained and the timer is reset.
+ *
-+ * Order matters to avoid false negatives. First drain
the socket,
-+ * then unset the timer. We're trying to catch the case
where the
-+ * pending timer expiration event takes the place of
one of the
-+ * socket events we're attempting to drain.
++ * Order matters, since comb_multiplexer() doesn't have
to remove
++ * stale events when active events exist. Follow the
call sequence
++ * used in the code: drain the timer expiration, drain
the pipe,
++ * then clear the stale events.
+ */
++ Assert(drain_timer_events(actx, &expired));
+ Assert(drain_pipe(rfd, 1));
-+ Assert(drain_socket_events(actx));
-+ Assert(set_timer(actx, -1));
++ Assert(comb_multiplexer(actx));
+
++ is(expired, 1, "drain_timer_events() reports
expiration");
+ is(timer_expired(actx), 0, "timer is no longer
expired");
+ mux_is_not_ready(actx->mux, "when fd is drained and
timer reset");
+
@@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
+ if (bidirectional)
+ Assert(drain_pipe(wfd, bidi_pipe_size));
+ }
++
++ /* Ensure comb_multiplexer() can handle multiple stale events.
*/
++ {
++ int rfd2,
++ wfd2;
++
++ /* Create a second local pipe. */
++ Assert(pipe(pipefd) == 0);
++ rfd2 = pipefd[0];
++ wfd2 = pipefd[1];
++
++ /* Make both rfds appear unidirectional if necessary. */
++ if (bidirectional)
++ {
++ Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
++ Assert(fill_pipe(rfd2) == bidi_pipe_size);
++ }
++
++ /* Register for read events on both fds, and make them
readable. */
++ Assert(register_socket(NULL, rfd, in_event, actx, NULL)
== 0);
++ Assert(register_socket(NULL, rfd2, in_event, actx,
NULL) == 0);
++
++ Assert(write(wfd, "x", 1) == 1);
++ Assert(write(wfd2, "x", 1) == 1);
++
++ mux_is_ready(actx->mux, deadline, "when two fds are
readable");
++
++ /*
++ * Drain both fds. comb_multiplexer() should then
ensure that the
++ * mux is no longer readable.
++ */
++ Assert(drain_pipe(rfd, 1));
++ Assert(drain_pipe(rfd2, 1));
++ Assert(comb_multiplexer(actx));
++ mux_is_not_ready(actx->mux, "when two fds are drained");
++
++ /* Stop listening. */
++ Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE,
actx, NULL) == 0);
++ Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE,
actx, NULL) == 0);
++
++ /* Undo any unidirectional emulation. */
++ if (bidirectional)
++ {
++ Assert(drain_pipe(wfd, bidi_pipe_size));
++ Assert(drain_pipe(wfd2, bidi_pipe_size));
++ }
++
++ close(rfd2);
++ close(wfd2);
++ }
+ }
+
+ close(rfd);
v3-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch
Description: Binary data
v3-0002-oauth-Ensure-unused-socket-registrations-are-remo.patch
Description: Binary data
v3-0003-oauth-Remove-expired-timers-from-the-multiplexer.patch
Description: Binary data
v3-0004-oauth-Track-total-call-count-during-a-client-flow.patch
Description: Binary data
v3-0005-oauth-Add-unit-tests-for-multiplexer-handling.patch
Description: Binary data
