On Wed, Aug 6, 2025 at 9:13 AM Jacob Champion
<jacob.champ...@enterprisedb.com> wrote:
> Maybe "drain" would no longer be the
> verb to use there.

I keep describing this as "combing" the queue when I talk about it in
person, so v3-0001 renames this new operation to comb_multiplexer().
And the CI (plus the more strenuous TLS tests) confirms that the
callback count is still stable with this weaker guarantee, so I've
gotten rid of the event-counting code.

Now that I'm no longer counting events, I can collapse the changes to
register_socket(). I can't revert those changes entirely, because then
we regress the case where Curl switches a socket from IN to OUT (this
is enforced by the new unit tests). But I'm not sure that the existing
comment adequately explained that fix anyway, and I didn't remember to
call it out in my initial email, so I've split it out into v3-0002.
It's much smaller.

The tests (now in 0005) have been adjusted for the new "combing"
behavior, and I've added a case to ensure that multiple stale events
are swept up by a single call to comb_multiplexer().

Thanks!
--Jacob
1:  379c12b5d26 < -:  ----------- oauth: Remove stale events from the kqueue 
multiplexer
-:  ----------- > 1:  c5cdccfe374 oauth: Remove stale events from the kqueue 
multiplexer
-:  ----------- > 2:  7725e0c173b oauth: Ensure unused socket registrations are 
removed
2:  f30317d7265 ! 3:  6ccf7a5d156 oauth: Remove expired timers from the 
multiplexer
    @@ src/interfaces/libpq-oauth/oauth-curl.c: 
pg_fe_run_oauth_flow_impl(PGconn *conn)
     +                                   * edge-triggered) timeouts, and ours 
are level-triggered
     +                                   * via the mux.
     +                                   *
    -+                                   * This can't be combined with the 
drain_socket_events()
    -+                                   * call below: we might accidentally 
clear a short timeout
    -+                                   * that was both set and expired during 
the call to
    ++                                   * This can't be combined with the 
comb_multiplexer() call
    ++                                   * below: we might accidentally clear a 
short timeout that
    ++                                   * was both set and expired during the 
call to
     +                                   * drive_request().
     +                                   */
     +                                  if (!drain_timer_events(actx, NULL))
3:  d243d28964d ! 4:  2be993b8f07 oauth: Track total call count during a client 
flow
    @@ Metadata
      ## Commit message ##
         oauth: Track total call count during a client flow
     
    -    Tracking down the bugs that led to the addition of 
drain_socket_events()
    +    Tracking down the bugs that led to the addition of comb_multiplexer()
         and drain_timer_events() was difficult, because an inefficient flow is
         not visibly different from one that is working properly. To help
         maintainers notice when something has gone wrong, track the number of
    @@ src/interfaces/libpq-oauth/oauth-curl.c: struct async_ctx
        bool            used_basic_auth;        /* did we send a client secret? 
*/
        bool            debugging;              /* can we give unsafe developer 
assistance? */
     +  int                     dbg_num_calls;  /* (debug mode) how many times 
were we called? */
    + };
      
    - #if defined(HAVE_SYS_EVENT_H)
    -   int                     nevents;                /* how many events are 
we waiting on? */
    + /*
     @@ src/interfaces/libpq-oauth/oauth-curl.c: PostgresPollingStatusType
      pg_fe_run_oauth_flow(PGconn *conn)
      {
    @@ src/interfaces/libpq-oauth/oauth-curl.c: pg_fe_run_oauth_flow(PGconn 
*conn)
        result = pg_fe_run_oauth_flow_impl(conn);
      
     +  /*
    -+   * To assist with finding bugs in drain_socket_events() and
    ++   * To assist with finding bugs in comb_multiplexer() and
     +   * drain_timer_events(), when we're in debug mode, track the total 
number
     +   * of calls to this function and print that at the end of the flow.
     +   *
4:  ca6fd237653 ! 5:  50257bf32eb oauth: Add unit tests for multiplexer handling
    @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
     +
     +          /*
     +           * Draining the pipe should unset the multiplexer again, once 
the old
    -+           * event is drained.
    ++           * event is cleared.
     +           */
     +          Assert(drain_pipe(rfd, 1));
    -+          Assert(drain_socket_events(actx));
    ++          Assert(comb_multiplexer(actx));
     +          mux_is_not_ready(actx->mux, "when fd is drained");
     +
     +          /* Undo any unidirectional emulation. */
    @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
     +                  ssize_t         written;
     +
     +                  /*
    -+                   * Fill the pipe. Once the old writable event is 
drained, the mux
    ++                   * Fill the pipe. Once the old writable event is 
cleared, the mux
     +                   * should not be ready.
     +                   */
     +                  Assert((written = fill_pipe(wfd)) > 0);
     +                  printf("# pipe buffer is full at %zd bytes\n", written);
     +
    -+                  Assert(drain_socket_events(actx));
    ++                  Assert(comb_multiplexer(actx));
     +                  mux_is_not_ready(actx->mux, "when fd buffer is full");
     +
     +                  /* Drain the pipe again. */
    @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
     +
     +          /* Make sure an expired timer doesn't interfere with event 
draining. */
     +          {
    ++                  bool            expired;
    ++
     +                  /* Make the rfd appear unidirectional if necessary. */
     +                  if (bidirectional)
     +                          Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
    @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
     +                   * Draining the pipe should unset the multiplexer 
again, once the
     +                   * old event is drained and the timer is reset.
     +                   *
    -+                   * Order matters to avoid false negatives. First drain 
the socket,
    -+                   * then unset the timer. We're trying to catch the case 
where the
    -+                   * pending timer expiration event takes the place of 
one of the
    -+                   * socket events we're attempting to drain.
    ++                   * Order matters, since comb_multiplexer() doesn't have 
to remove
    ++                   * stale events when active events exist. Follow the 
call sequence
    ++                   * used in the code: drain the timer expiration, drain 
the pipe,
    ++                   * then clear the stale events.
     +                   */
    ++                  Assert(drain_timer_events(actx, &expired));
     +                  Assert(drain_pipe(rfd, 1));
    -+                  Assert(drain_socket_events(actx));
    -+                  Assert(set_timer(actx, -1));
    ++                  Assert(comb_multiplexer(actx));
     +
    ++                  is(expired, 1, "drain_timer_events() reports 
expiration");
     +                  is(timer_expired(actx), 0, "timer is no longer 
expired");
     +                  mux_is_not_ready(actx->mux, "when fd is drained and 
timer reset");
     +
    @@ src/interfaces/libpq-oauth/test-oauth-curl.c (new)
     +                  if (bidirectional)
     +                          Assert(drain_pipe(wfd, bidi_pipe_size));
     +          }
    ++
    ++          /* Ensure comb_multiplexer() can handle multiple stale events. 
*/
    ++          {
    ++                  int                     rfd2,
    ++                                          wfd2;
    ++
    ++                  /* Create a second local pipe. */
    ++                  Assert(pipe(pipefd) == 0);
    ++                  rfd2 = pipefd[0];
    ++                  wfd2 = pipefd[1];
    ++
    ++                  /* Make both rfds appear unidirectional if necessary. */
    ++                  if (bidirectional)
    ++                  {
    ++                          Assert((bidi_pipe_size = fill_pipe(rfd)) > 0);
    ++                          Assert(fill_pipe(rfd2) == bidi_pipe_size);
    ++                  }
    ++
    ++                  /* Register for read events on both fds, and make them 
readable. */
    ++                  Assert(register_socket(NULL, rfd, in_event, actx, NULL) 
== 0);
    ++                  Assert(register_socket(NULL, rfd2, in_event, actx, 
NULL) == 0);
    ++
    ++                  Assert(write(wfd, "x", 1) == 1);
    ++                  Assert(write(wfd2, "x", 1) == 1);
    ++
    ++                  mux_is_ready(actx->mux, deadline, "when two fds are 
readable");
    ++
    ++                  /*
    ++                   * Drain both fds. comb_multiplexer() should then 
ensure that the
    ++                   * mux is no longer readable.
    ++                   */
    ++                  Assert(drain_pipe(rfd, 1));
    ++                  Assert(drain_pipe(rfd2, 1));
    ++                  Assert(comb_multiplexer(actx));
    ++                  mux_is_not_ready(actx->mux, "when two fds are drained");
    ++
    ++                  /* Stop listening. */
    ++                  Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, 
actx, NULL) == 0);
    ++                  Assert(register_socket(NULL, rfd2, CURL_POLL_REMOVE, 
actx, NULL) == 0);
    ++
    ++                  /* Undo any unidirectional emulation. */
    ++                  if (bidirectional)
    ++                  {
    ++                          Assert(drain_pipe(wfd, bidi_pipe_size));
    ++                          Assert(drain_pipe(wfd2, bidi_pipe_size));
    ++                  }
    ++
    ++                  close(rfd2);
    ++                  close(wfd2);
    ++          }
     +  }
     +
     +  close(rfd);

Attachment: v3-0001-oauth-Remove-stale-events-from-the-kqueue-multipl.patch
Description: Binary data

Attachment: v3-0002-oauth-Ensure-unused-socket-registrations-are-remo.patch
Description: Binary data

Attachment: v3-0003-oauth-Remove-expired-timers-from-the-multiplexer.patch
Description: Binary data

Attachment: v3-0004-oauth-Track-total-call-count-during-a-client-flow.patch
Description: Binary data

Attachment: v3-0005-oauth-Add-unit-tests-for-multiplexer-handling.patch
Description: Binary data

Reply via email to