On Thu, Nov 6, 2025, at 00:21, Chao Li wrote:
> That’s what we don’t know. We now lack a performance test for 
> evaluating how “direct advancement” efficiently helps if it only 
> handles sleeping listeners. So what I was suggesting is that we should 
> first create some tests, maybe also add a few more statistics, so that 
> we can evaluate different solutions. If a simple implementation that 
> only handles sleeping listeners would have performed good enough, of 
> course we can take it; otherwise we may need to either pursue a better 
> solution.

Just for the sake of evaluating this patch, I've added instrumentation
of async.c that increments counters for the different branches in
asyncQueueReadAllNotifications and SignalBackends. (I'm just using
atomics without any locking, but should be fine since this is just
statistics.)

pg_get_async_wakeup_stats-patch.txt adds the SQL-callable
catalog functions pg_reset_async_wakeup_stats() and 
pg_get_async_wakeup_stats(), which should not be included in the patch,
they are just for evaluating. It can be applied on top of the v23 patch.

Below is just an example of how to compile and an arbitrary mix of
command line options. I've tired a lot of combinations, and we seem to
be holding up fine in all cases I've tried.

async-notify-test-5.c will detect if the pg_*_async_wakeup_stats() functions
exists, and only show the extra histograms if so.

% gcc -Wall -Wextra -O2 -pthread -I/Users/joel/pg19/include/postgresql/server 
-I/Users/joel/pg19/include -o async-notify-test-5 async-notify-test-5.c 
-L/Users/joel/pg19/lib -lpq -pthread -lm

% ./async-notify-test-5 --listeners 10 --notifiers 10 --channels 10 --sleep 0.1 
--sleep-exp 2.0 --batch 10
10 s: 38100 sent (3690/s), 381000 received (36900/s)
Notification Latency Distribution:
 0.00-0.01ms                0 (0.0%) avg: 0.000ms
 0.01-0.10ms     #          23 (0.0%) avg: 0.092ms
 0.10-1.00ms     #######    298002 (78.2%) avg: 0.563ms
 1.00-10.00ms    ##         82975 (21.8%) avg: 1.506ms
 10.00-100.00ms             0 (0.0%) avg: 0.000ms
>100.00ms                  0 (0.0%) avg: 0.000ms

asyncQueueReadAllNotifications Statistics:
necessary_wakeups    ########   35469 (88.2%)
unnecessary_wakeups  #          4762 (11.8%)

SignalBackends Statistics:
signaled_needed     #          34983 (9.5%)
avoided_wakeups     ########   325874 (88.9%)
already_advancing   #          3 (0.0%)
signaled_uncertain  #          5347 (1.5%)
already_ahead       #          375 (0.1%)

Thoughts on how to interpret results:

- Is the notification latency distribution good enough, for the given
  workload? Naturally, if the workload is too high, we cannot expect to
  ever achieve sub millisecond latency anyway, so it's a judgement.

- Even if the "unnecessary_wakeups" is high relative to
  "necessary_wakeups", it's not necessarily a problem, if the latency
  distribution still is good enough. We should also think about the
  ratio between "unnecessary_wakeups" and "avoided_wakeups", since even
  if "unnecessary_wakeups" is high in absolute numbers, if the
  "avoided_wakeups" is magnitudes larger, that means the cost of the
  context switching has been dramatically reduced already. I think there
  is always a risk when optimizing to forget what problem one was trying
  to solve initially, usually a bottleneck. When the bottleneck is gone
  and is somewhere else instead, then the efforts should IMO usually be
  spent elsewhere, especially if more optimizations would need a
  insignificant increase of code complexity.

- It's the "signaled_uncertain" that primarily contribute to
  "unnecessary_wakeups".

/Joel

Attachment: async-notify-test-5.c
Description: Binary data

diff --git a/src/backend/commands/async.c b/src/backend/commands/async.c
index 8dac12f8124..7e8e0b14f42 100644
--- a/src/backend/commands/async.c
+++ b/src/backend/commands/async.c
@@ -137,6 +137,7 @@
 #include <signal.h>
 #include <string.h>
 
+#include "access/htup_details.h"
 #include "access/parallel.h"
 #include "access/slru.h"
 #include "access/transam.h"
@@ -332,6 +333,13 @@ typedef struct AsyncQueueControl
        TimestampTz lastQueueFillWarn;  /* time of last queue-full msg */
        dsa_handle      channelHashDSA;
        dshash_table_handle channelHashDSH;
+       pg_atomic_uint32 signaledNeeded;        /* listening to some of the 
channels; signal needed */
+       pg_atomic_uint32 avoidedWakeups;        /* directly advanced */
+       pg_atomic_uint32 alreadyAdvancing;      /* already advancing its 
position */
+       pg_atomic_uint32 signaledUncertain;     /* signaled due to uncertain 
need */
+       pg_atomic_uint32 alreadyAhead;  /* already ahead, no action needed */
+       pg_atomic_uint32 necessaryWakeups;              /* wakeups where at 
least one message was interesting */
+       pg_atomic_uint32 unnecessaryWakeups;    /* wakeups where no messages 
were interesting */
        QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER];
 } AsyncQueueControl;
 
@@ -517,7 +525,8 @@ static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                                                
 QueuePosition stop,
                                                                                
 char *page_buffer,
-                                                                               
 Snapshot snapshot);
+                                                                               
 Snapshot snapshot,
+                                                                               
 bool *interested);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(bool flush);
 static bool AsyncExistsPendingNotify(Notification *n);
@@ -683,6 +692,13 @@ AsyncShmemInit(void)
                asyncQueueControl->lastQueueFillWarn = 0;
                asyncQueueControl->channelHashDSA = DSA_HANDLE_INVALID;
                asyncQueueControl->channelHashDSH = DSHASH_HANDLE_INVALID;
+               pg_atomic_init_u32(&asyncQueueControl->signaledNeeded, 0);
+               pg_atomic_init_u32(&asyncQueueControl->avoidedWakeups, 0);
+               pg_atomic_init_u32(&asyncQueueControl->alreadyAdvancing, 0);
+               pg_atomic_init_u32(&asyncQueueControl->signaledUncertain, 0);
+               pg_atomic_init_u32(&asyncQueueControl->alreadyAhead, 0);
+               pg_atomic_init_u32(&asyncQueueControl->necessaryWakeups, 0);
+               pg_atomic_init_u32(&asyncQueueControl->unnecessaryWakeups, 0);
 
                for (int i = 0; i < MaxBackends; i++)
                {
@@ -997,6 +1013,81 @@ pg_listening_channels(PG_FUNCTION_ARGS)
        SRF_RETURN_DONE(funcctx);
 }
 
+/*
+ * SQL function: return statistics about NOTIFY wakeups
+ *
+ * This function returns a single row with:
+ * - necessary_wakeups: wakeups where at least one message was interesting
+ * - unnecessary_wakeups: wakeups where no messages were interesting
+ * - direct_advancements_success: directly advanced
+ * - already_advancing: already advancing its position
+ * - signaled_uncertain: signaled due to uncertain need
+ * - already_ahead: already ahead, no action needed
+ */
+Datum
+pg_get_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+       TupleDesc       tupdesc;
+       Datum           values[7];
+       bool            nulls[7];
+       HeapTuple       tuple;
+       uint32          signaled_needed;
+       uint32          direct_advancements_success;
+       uint32          already_advancing;
+       uint32          signaled_uncertain;
+       uint32          already_ahead;
+       uint32          necessary_wakeups;
+       uint32          unnecessary_wakeups;
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+               ereport(ERROR,
+                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                errmsg("function returning record called in 
context that cannot accept type record")));
+
+       /* Read the atomic counters */
+       signaled_needed = 
pg_atomic_read_u32(&asyncQueueControl->signaledNeeded);
+       direct_advancements_success = 
pg_atomic_read_u32(&asyncQueueControl->avoidedWakeups);
+       already_advancing = 
pg_atomic_read_u32(&asyncQueueControl->alreadyAdvancing);
+       signaled_uncertain = 
pg_atomic_read_u32(&asyncQueueControl->signaledUncertain);
+       already_ahead = pg_atomic_read_u32(&asyncQueueControl->alreadyAhead);
+       necessary_wakeups = 
pg_atomic_read_u32(&asyncQueueControl->necessaryWakeups);
+       unnecessary_wakeups = 
pg_atomic_read_u32(&asyncQueueControl->unnecessaryWakeups);
+
+       /* Fill in the values */
+       memset(nulls, 0, sizeof(nulls));
+       values[0] = Int64GetDatum((int64) signaled_needed);
+       values[1] = Int64GetDatum((int64) direct_advancements_success);
+       values[2] = Int64GetDatum((int64) already_advancing);
+       values[3] = Int64GetDatum((int64) signaled_uncertain);
+       values[4] = Int64GetDatum((int64) already_ahead);
+       values[5] = Int64GetDatum((int64) necessary_wakeups);
+       values[6] = Int64GetDatum((int64) unnecessary_wakeups);
+
+       tuple = heap_form_tuple(tupdesc, values, nulls);
+       PG_RETURN_DATUM(HeapTupleGetDatum(tuple));
+}
+
+/*
+ * SQL function: reset NOTIFY wakeup statistics
+ *
+ * This function resets all the async wakeup counters to zero.
+ */
+Datum
+pg_reset_async_wakeup_stats(PG_FUNCTION_ARGS)
+{
+       /* Reset all the atomic counters to zero */
+       pg_atomic_write_u32(&asyncQueueControl->signaledNeeded, 0);
+       pg_atomic_write_u32(&asyncQueueControl->avoidedWakeups, 0);
+       pg_atomic_write_u32(&asyncQueueControl->alreadyAdvancing, 0);
+       pg_atomic_write_u32(&asyncQueueControl->signaledUncertain, 0);
+       pg_atomic_write_u32(&asyncQueueControl->alreadyAhead, 0);
+       pg_atomic_write_u32(&asyncQueueControl->necessaryWakeups, 0);
+       pg_atomic_write_u32(&asyncQueueControl->unnecessaryWakeups, 0);
+
+       PG_RETURN_VOID();
+}
+
 /*
  * Async_UnlistenOnExit
  *
@@ -2014,6 +2105,7 @@ SignalBackends(void)
 
                        Assert(pid != InvalidPid);
 
+                       
pg_atomic_fetch_add_u32(&asyncQueueControl->signaledNeeded, 1);
                        QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
                        pids[count] = pid;
                        procnos[count] = i;
@@ -2049,7 +2141,14 @@ SignalBackends(void)
                                 * currently advancing its position.
                                 */
                                if (!QUEUE_BACKEND_ADVANCING_POS(i))
+                               {
                                        QUEUE_BACKEND_POS(i) = 
queueHeadAfterWrite;
+                                       
pg_atomic_fetch_add_u32(&asyncQueueControl->avoidedWakeups, 1);
+                               }
+                               else
+                               {
+                                       
pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAdvancing, 1);
+                               }
                        }
                        else if (QUEUE_POS_PRECEDES(pos, queueHeadBeforeWrite))
                        {
@@ -2060,6 +2159,7 @@ SignalBackends(void)
                                 */
                                Assert(pid != InvalidPid);
 
+                               
pg_atomic_fetch_add_u32(&asyncQueueControl->signaledUncertain, 1);
                                QUEUE_BACKEND_WAKEUP_PENDING(i) = true;
                                pids[count] = pid;
                                procnos[count] = i;
@@ -2071,6 +2171,7 @@ SignalBackends(void)
                                 * The backend is already ahead of the 
notifications we wrote.
                                 * No need to do anything.
                                 */
+                               
pg_atomic_fetch_add_u32(&asyncQueueControl->alreadyAhead, 1);
                                Assert(QUEUE_POS_PRECEDES(queueHeadBeforeWrite, 
pos));
                        }
                }
@@ -2301,6 +2402,7 @@ asyncQueueReadAllNotifications(void)
        volatile QueuePosition pos;
        QueuePosition head;
        Snapshot        snapshot;
+       bool            interested = false;
 
        /* page_buffer must be adequately aligned, so use a union */
        union
@@ -2435,7 +2537,8 @@ asyncQueueReadAllNotifications(void)
                         */
                        reachedStop = asyncQueueProcessPageEntries(&pos, head,
                                                                                
                           page_buffer.buf,
-                                                                               
                           snapshot);
+                                                                               
                           snapshot,
+                                                                               
                           &interested);
                } while (!reachedStop);
        }
        PG_FINALLY();
@@ -2450,6 +2553,11 @@ asyncQueueReadAllNotifications(void)
        }
        PG_END_TRY();
 
+       if (interested)
+               pg_atomic_fetch_add_u32(&asyncQueueControl->necessaryWakeups, 
1);
+       else
+               pg_atomic_fetch_add_u32(&asyncQueueControl->unnecessaryWakeups, 
1);
+
        /* Done with snapshot */
        UnregisterSnapshot(snapshot);
 }
@@ -2474,7 +2582,8 @@ static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
                                                         QueuePosition stop,
                                                         char *page_buffer,
-                                                        Snapshot snapshot)
+                                                        Snapshot snapshot,
+                                                        bool *interested)
 {
        bool            reachedStop = false;
        bool            reachedEndOfPage;
@@ -2535,6 +2644,9 @@ asyncQueueProcessPageEntries(volatile QueuePosition 
*current,
                                        char       *payload = qe->data + 
strlen(channel) + 1;
 
                                        NotifyMyFrontEnd(channel, payload, 
qe->srcPid);
+
+                                       /* Mark were interested in at least one 
message */
+                                       *interested = true;
                                }
                        }
                        else
diff --git a/src/include/catalog/pg_proc.dat b/src/include/catalog/pg_proc.dat
index 9121a382f76..0bbd7db39c7 100644
--- a/src/include/catalog/pg_proc.dat
+++ b/src/include/catalog/pg_proc.dat
@@ -8571,7 +8571,18 @@
   proname => 'pg_notification_queue_usage', provolatile => 'v',
   proparallel => 'r', prorettype => 'float8', proargtypes => '',
   prosrc => 'pg_notification_queue_usage' },
-
+{ oid => '9315',
+  descr => 'get statistics about NOTIFY wakeups',
+  proname => 'pg_get_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'record', proargtypes => '',
+  proallargtypes => '{int8,int8,int8,int8,int8,int8,int8}', proargmodes => 
'{o,o,o,o,o,o,o}',
+  proargnames => 
'{signaled_needed,avoided_wakeups,already_advancing,signaled_uncertain,already_ahead,necessary_wakeups,unnecessary_wakeups}',
+  prosrc => 'pg_get_async_wakeup_stats' },
+{ oid => '9316',
+  descr => 'reset statistics about NOTIFY wakeups',
+  proname => 'pg_reset_async_wakeup_stats', provolatile => 'v',
+  proparallel => 'r', prorettype => 'void', proargtypes => '',
+  prosrc => 'pg_reset_async_wakeup_stats' },
 # shared memory usage
 { oid => '5052', descr => 'allocations from the main shared memory segment',
   proname => 'pg_get_shmem_allocations', prorows => '50', proretset => 't',

Reply via email to